This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 467791fea369a30a8334dacde49baedb7f1d7767
Author: codenohup <[email protected]>
AuthorDate: Wed Sep 11 16:43:21 2024 +0800

    [hotfix][API] DataStream should also support connectAndProcess with 
ProcessConfigurableStream
    
    Signed-off-by: codenohup <[email protected]>
---
 .../api/stream/KeyedPartitionStream.java           | 13 ++---
 .../api/stream/NonKeyedPartitionStream.java        | 10 ++--
 .../impl/stream/KeyedPartitionStreamImpl.java      | 56 +++++++-------------
 .../impl/stream/NonKeyedPartitionStreamImpl.java   | 42 +++------------
 ...essConfigurableAndKeyedPartitionStreamImpl.java |  9 +++-
 ...ConfigurableAndNonKeyedPartitionStreamImpl.java |  6 ++-
 ...onfigurableAndTwoKeyedPartitionStreamsImpl.java | 58 +++++++++++++++++++++
 ...figurableAndTwoNonKeyedPartitionStreamImpl.java | 59 ++++++++++++++++++++++
 .../impl/stream/KeyedPartitionStreamImplTest.java  | 14 ++---
 .../stream/NonKeyedPartitionStreamImplTest.java    |  5 +-
 10 files changed, 178 insertions(+), 94 deletions(-)

diff --git 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/KeyedPartitionStream.java
 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/KeyedPartitionStream.java
index b7a3de8e3c7..2ca8feca275 100644
--- 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/KeyedPartitionStream.java
+++ 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/KeyedPartitionStream.java
@@ -26,7 +26,7 @@ import 
org.apache.flink.datastream.api.function.TwoInputBroadcastStreamProcessFu
 import 
org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction;
 import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction;
 import 
org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.ProcessConfigurableAndNonKeyedPartitionStream;
-import 
org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.TwoNonKeyedPartitionStreams;
+import 
org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.ProcessConfigurableAndTwoNonKeyedPartitionStream;
 
 /**
  * This interface represents a kind of partitioned data stream. For this 
stream, each key is a
@@ -76,9 +76,9 @@ public interface KeyedPartitionStream<K, T> extends 
DataStream {
      * @param processFunction to perform two output operation.
      * @param keySelector1 to select the key of first output.
      * @param keySelector2 to select the key of second output.
-     * @return new {@link TwoKeyedPartitionStreams} with this operation.
+     * @return new {@link ProcessConfigurableAndTwoKeyedPartitionStreams} with 
this operation.
      */
-    <OUT1, OUT2> TwoKeyedPartitionStreams<K, OUT1, OUT2> process(
+    <OUT1, OUT2> ProcessConfigurableAndTwoKeyedPartitionStreams<K, OUT1, OUT2> 
process(
             TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunction,
             KeySelector<OUT1, K> keySelector1,
             KeySelector<OUT2, K> keySelector2);
@@ -87,9 +87,9 @@ public interface KeyedPartitionStream<K, T> extends 
DataStream {
      * Apply a two output operation to this {@link KeyedPartitionStream}.
      *
      * @param processFunction to perform two output operation.
-     * @return new {@link TwoNonKeyedPartitionStreams} with this operation.
+     * @return new {@link ProcessConfigurableAndTwoNonKeyedPartitionStream} 
with this operation.
      */
-    <OUT1, OUT2> TwoNonKeyedPartitionStreams<OUT1, OUT2> process(
+    <OUT1, OUT2> ProcessConfigurableAndTwoNonKeyedPartitionStream<OUT1, OUT2> 
process(
             TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunction);
 
     /**
@@ -209,7 +209,8 @@ public interface KeyedPartitionStream<K, T> extends 
DataStream {
      * the return value of operation with two output.
      */
     @Experimental
-    interface TwoKeyedPartitionStreams<K, T1, T2> {
+    interface ProcessConfigurableAndTwoKeyedPartitionStreams<K, T1, T2>
+            extends 
ProcessConfigurable<ProcessConfigurableAndTwoKeyedPartitionStreams<K, T1, T2>> {
         /** Get the first stream. */
         ProcessConfigurableAndKeyedPartitionStream<K, T1> getFirst();
 
diff --git 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/NonKeyedPartitionStream.java
 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/NonKeyedPartitionStream.java
index 5471c94e875..165071ce75b 100644
--- 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/NonKeyedPartitionStream.java
+++ 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/NonKeyedPartitionStream.java
@@ -47,7 +47,7 @@ public interface NonKeyedPartitionStream<T> extends 
DataStream {
      * @param processFunction to perform two output operation.
      * @return new stream with this operation.
      */
-    <OUT1, OUT2> TwoNonKeyedPartitionStreams<OUT1, OUT2> process(
+    <OUT1, OUT2> ProcessConfigurableAndTwoNonKeyedPartitionStream<OUT1, OUT2> 
process(
             TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunction);
 
     /**
@@ -114,11 +114,13 @@ public interface NonKeyedPartitionStream<T> extends 
DataStream {
      * used as the return value of operation with two output.
      */
     @Experimental
-    interface TwoNonKeyedPartitionStreams<T1, T2> {
+    interface ProcessConfigurableAndTwoNonKeyedPartitionStream<OUT1, OUT2>
+            extends ProcessConfigurable<
+                    ProcessConfigurableAndTwoNonKeyedPartitionStream<OUT1, 
OUT2>> {
         /** Get the first stream. */
-        ProcessConfigurableAndNonKeyedPartitionStream<T1> getFirst();
+        ProcessConfigurableAndNonKeyedPartitionStream<OUT1> getFirst();
 
         /** Get the second stream. */
-        ProcessConfigurableAndNonKeyedPartitionStream<T2> getSecond();
+        ProcessConfigurableAndNonKeyedPartitionStream<OUT2> getSecond();
     }
 }
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImpl.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImpl.java
index 85b6c1531cd..d9650020478 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImpl.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImpl.java
@@ -34,13 +34,12 @@ import org.apache.flink.datastream.api.stream.GlobalStream;
 import org.apache.flink.datastream.api.stream.KeyedPartitionStream;
 import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
 import 
org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.ProcessConfigurableAndNonKeyedPartitionStream;
-import 
org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.TwoNonKeyedPartitionStreams;
+import 
org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.ProcessConfigurableAndTwoNonKeyedPartitionStream;
 import org.apache.flink.datastream.api.stream.ProcessConfigurable;
 import org.apache.flink.datastream.impl.operators.KeyedProcessOperator;
 import 
org.apache.flink.datastream.impl.operators.KeyedTwoInputBroadcastProcessOperator;
 import 
org.apache.flink.datastream.impl.operators.KeyedTwoInputNonBroadcastProcessOperator;
 import 
org.apache.flink.datastream.impl.operators.KeyedTwoOutputProcessOperator;
-import 
org.apache.flink.datastream.impl.stream.NonKeyedPartitionStreamImpl.TwoNonKeyedPartitionStreamsImpl;
 import org.apache.flink.datastream.impl.utils.StreamUtils;
 import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
 import 
org.apache.flink.streaming.api.transformations.DataStreamV2SinkTransformation;
@@ -153,7 +152,7 @@ public class KeyedPartitionStreamImpl<K, V> extends 
AbstractDataStream<V>
     }
 
     @Override
-    public <OUT1, OUT2> TwoKeyedPartitionStreams<K, OUT1, OUT2> process(
+    public <OUT1, OUT2> ProcessConfigurableAndTwoKeyedPartitionStreams<K, 
OUT1, OUT2> process(
             TwoOutputStreamProcessFunction<V, OUT1, OUT2> processFunction,
             KeySelector<OUT1, K> keySelector1,
             KeySelector<OUT2, K> keySelector2) {
@@ -202,11 +201,12 @@ public class KeyedPartitionStreamImpl<K, V> extends 
AbstractDataStream<V>
                         TypeExtractor.getKeySelectorTypes(
                                 keySelector2, nonKeyedSideStream.getType()));
         environment.addOperator(mainOutputTransform);
-        return TwoKeyedPartitionStreamsImpl.of(keyedMainOutputStream, 
keyedSideOutputStream);
+        return new ProcessConfigurableAndTwoKeyedPartitionStreamsImpl<>(
+                environment, mainOutputTransform, keyedMainOutputStream, 
keyedSideOutputStream);
     }
 
     @Override
-    public <OUT1, OUT2> TwoNonKeyedPartitionStreams<OUT1, OUT2> process(
+    public <OUT1, OUT2> ProcessConfigurableAndTwoNonKeyedPartitionStream<OUT1, 
OUT2> process(
             TwoOutputStreamProcessFunction<V, OUT1, OUT2> processFunction) {
         validateStates(
                 processFunction.usesStates(),
@@ -235,7 +235,8 @@ public class KeyedPartitionStreamImpl<K, V> extends 
AbstractDataStream<V>
                 new NonKeyedPartitionStreamImpl<>(
                         environment, 
firstStream.getSideOutputTransform(secondOutputTag));
         environment.addOperator(firstTransformation);
-        return TwoNonKeyedPartitionStreamsImpl.of(firstStream, secondStream);
+        return new ProcessConfigurableAndTwoNonKeyedPartitionStreamImpl<>(
+                environment, firstTransformation, firstStream, secondStream);
     }
 
     @Override
@@ -246,7 +247,11 @@ public class KeyedPartitionStreamImpl<K, V> extends 
AbstractDataStream<V>
                 processFunction.usesStates(),
                 new HashSet<>(
                         
Collections.singletonList(StateDeclaration.RedistributionMode.IDENTICAL)));
-
+        other =
+                other instanceof ProcessConfigurableAndKeyedPartitionStreamImpl
+                        ? ((ProcessConfigurableAndKeyedPartitionStreamImpl) 
other)
+                                .getKeyedPartitionStream()
+                        : other;
         TypeInformation<OUT> outTypeInfo =
                 
StreamUtils.getOutputTypeForTwoInputNonBroadcastProcessFunction(
                         processFunction,
@@ -282,7 +287,11 @@ public class KeyedPartitionStreamImpl<K, V> extends 
AbstractDataStream<V>
                         processFunction,
                         getType(),
                         ((KeyedPartitionStreamImpl<K, T_OTHER>) 
other).getType());
-
+        other =
+                other instanceof ProcessConfigurableAndKeyedPartitionStreamImpl
+                        ? ((ProcessConfigurableAndKeyedPartitionStreamImpl) 
other)
+                                .getKeyedPartitionStream()
+                        : other;
         KeyedTwoInputNonBroadcastProcessOperator<K, V, T_OTHER, OUT> 
processOperator =
                 new 
KeyedTwoInputNonBroadcastProcessOperator<>(processFunction, newKeySelector);
         Transformation<OUT> outTransformation =
@@ -413,35 +422,4 @@ public class KeyedPartitionStreamImpl<K, V> extends 
AbstractDataStream<V>
     public BroadcastStream<V> broadcast() {
         return new BroadcastStreamImpl<>(environment, getTransformation());
     }
-
-    static class TwoKeyedPartitionStreamsImpl<K, OUT1, OUT2>
-            implements TwoKeyedPartitionStreams<K, OUT1, OUT2> {
-
-        private final KeyedPartitionStreamImpl<K, OUT1> firstStream;
-
-        private final KeyedPartitionStreamImpl<K, OUT2> secondStream;
-
-        public static <K, OUT1, OUT2> TwoKeyedPartitionStreamsImpl<K, OUT1, 
OUT2> of(
-                KeyedPartitionStreamImpl<K, OUT1> firstStream,
-                KeyedPartitionStreamImpl<K, OUT2> secondStream) {
-            return new TwoKeyedPartitionStreamsImpl<>(firstStream, 
secondStream);
-        }
-
-        private TwoKeyedPartitionStreamsImpl(
-                KeyedPartitionStreamImpl<K, OUT1> firstStream,
-                KeyedPartitionStreamImpl<K, OUT2> secondStream) {
-            this.firstStream = firstStream;
-            this.secondStream = secondStream;
-        }
-
-        @Override
-        public ProcessConfigurableAndKeyedPartitionStream<K, OUT1> getFirst() {
-            return StreamUtils.wrapWithConfigureHandle(firstStream);
-        }
-
-        @Override
-        public ProcessConfigurableAndKeyedPartitionStream<K, OUT2> getSecond() 
{
-            return StreamUtils.wrapWithConfigureHandle(secondStream);
-        }
-    }
 }
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/NonKeyedPartitionStreamImpl.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/NonKeyedPartitionStreamImpl.java
index dda90d290be..82b81147f71 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/NonKeyedPartitionStreamImpl.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/NonKeyedPartitionStreamImpl.java
@@ -81,7 +81,7 @@ public class NonKeyedPartitionStreamImpl<T> extends 
AbstractDataStream<T>
     }
 
     @Override
-    public <OUT1, OUT2> TwoNonKeyedPartitionStreams<OUT1, OUT2> process(
+    public <OUT1, OUT2> ProcessConfigurableAndTwoNonKeyedPartitionStream<OUT1, 
OUT2> process(
             TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunction) {
         validateStates(
                 processFunction.usesStates(),
@@ -107,7 +107,8 @@ public class NonKeyedPartitionStreamImpl<T> extends 
AbstractDataStream<T>
                 new NonKeyedPartitionStreamImpl<>(
                         environment, 
firstStream.getSideOutputTransform(secondOutputTag));
         environment.addOperator(outTransformation);
-        return TwoNonKeyedPartitionStreamsImpl.of(firstStream, secondStream);
+        return new ProcessConfigurableAndTwoNonKeyedPartitionStreamImpl<>(
+                environment, outTransformation, firstStream, secondStream);
     }
 
     @Override
@@ -120,7 +121,11 @@ public class NonKeyedPartitionStreamImpl<T> extends 
AbstractDataStream<T>
                         Arrays.asList(
                                 StateDeclaration.RedistributionMode.NONE,
                                 
StateDeclaration.RedistributionMode.IDENTICAL)));
-
+        other =
+                other instanceof 
ProcessConfigurableAndNonKeyedPartitionStreamImpl
+                        ? ((ProcessConfigurableAndNonKeyedPartitionStreamImpl) 
other)
+                                .getNonKeyedPartitionStream()
+                        : other;
         TypeInformation<OUT> outTypeInfo =
                 
StreamUtils.getOutputTypeForTwoInputNonBroadcastProcessFunction(
                         processFunction,
@@ -205,35 +210,4 @@ public class NonKeyedPartitionStreamImpl<T> extends 
AbstractDataStream<T>
     public BroadcastStream<T> broadcast() {
         return new BroadcastStreamImpl<>(environment, getTransformation());
     }
-
-    static class TwoNonKeyedPartitionStreamsImpl<OUT1, OUT2>
-            implements TwoNonKeyedPartitionStreams<OUT1, OUT2> {
-
-        private final NonKeyedPartitionStreamImpl<OUT1> firstStream;
-
-        private final NonKeyedPartitionStreamImpl<OUT2> secondStream;
-
-        public static <OUT1, OUT2> TwoNonKeyedPartitionStreamsImpl<OUT1, OUT2> 
of(
-                NonKeyedPartitionStreamImpl<OUT1> firstStream,
-                NonKeyedPartitionStreamImpl<OUT2> secondStream) {
-            return new TwoNonKeyedPartitionStreamsImpl<>(firstStream, 
secondStream);
-        }
-
-        private TwoNonKeyedPartitionStreamsImpl(
-                NonKeyedPartitionStreamImpl<OUT1> firstStream,
-                NonKeyedPartitionStreamImpl<OUT2> secondStream) {
-            this.firstStream = firstStream;
-            this.secondStream = secondStream;
-        }
-
-        @Override
-        public ProcessConfigurableAndNonKeyedPartitionStream<OUT1> getFirst() {
-            return StreamUtils.wrapWithConfigureHandle(firstStream);
-        }
-
-        @Override
-        public ProcessConfigurableAndNonKeyedPartitionStream<OUT2> getSecond() 
{
-            return StreamUtils.wrapWithConfigureHandle(secondStream);
-        }
-    }
 }
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/ProcessConfigurableAndKeyedPartitionStreamImpl.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/ProcessConfigurableAndKeyedPartitionStreamImpl.java
index c2c11a4c911..ccaacb0cbb8 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/ProcessConfigurableAndKeyedPartitionStreamImpl.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/ProcessConfigurableAndKeyedPartitionStreamImpl.java
@@ -30,6 +30,7 @@ import 
org.apache.flink.datastream.api.stream.KeyedPartitionStream;
 import 
org.apache.flink.datastream.api.stream.KeyedPartitionStream.ProcessConfigurableAndKeyedPartitionStream;
 import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
 import 
org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.ProcessConfigurableAndNonKeyedPartitionStream;
+import 
org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.ProcessConfigurableAndTwoNonKeyedPartitionStream;
 import org.apache.flink.datastream.api.stream.ProcessConfigurable;
 
 /**
@@ -60,7 +61,7 @@ public class 
ProcessConfigurableAndKeyedPartitionStreamImpl<K, T>
     }
 
     @Override
-    public <OUT1, OUT2> TwoKeyedPartitionStreams<K, OUT1, OUT2> process(
+    public <OUT1, OUT2> ProcessConfigurableAndTwoKeyedPartitionStreams<K, 
OUT1, OUT2> process(
             TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunction,
             KeySelector<OUT1, K> keySelector1,
             KeySelector<OUT2, K> keySelector2) {
@@ -68,7 +69,7 @@ public class 
ProcessConfigurableAndKeyedPartitionStreamImpl<K, T>
     }
 
     @Override
-    public <OUT1, OUT2> 
NonKeyedPartitionStream.TwoNonKeyedPartitionStreams<OUT1, OUT2> process(
+    public <OUT1, OUT2> ProcessConfigurableAndTwoNonKeyedPartitionStream<OUT1, 
OUT2> process(
             TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunction) {
         return stream.process(processFunction);
     }
@@ -127,4 +128,8 @@ public class 
ProcessConfigurableAndKeyedPartitionStreamImpl<K, T>
     public ProcessConfigurable<?> toSink(Sink<T> sink) {
         return stream.toSink(sink);
     }
+
+    public KeyedPartitionStreamImpl<K, T> getKeyedPartitionStream() {
+        return stream;
+    }
 }
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/ProcessConfigurableAndNonKeyedPartitionStreamImpl.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/ProcessConfigurableAndNonKeyedPartitionStreamImpl.java
index 2ed14e67ca7..82f958d7e0a 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/ProcessConfigurableAndNonKeyedPartitionStreamImpl.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/ProcessConfigurableAndNonKeyedPartitionStreamImpl.java
@@ -53,7 +53,7 @@ public class 
ProcessConfigurableAndNonKeyedPartitionStreamImpl<T>
     }
 
     @Override
-    public <OUT1, OUT2> TwoNonKeyedPartitionStreams<OUT1, OUT2> process(
+    public <OUT1, OUT2> ProcessConfigurableAndTwoNonKeyedPartitionStream<OUT1, 
OUT2> process(
             TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunction) {
         return stream.process(processFunction);
     }
@@ -96,4 +96,8 @@ public class 
ProcessConfigurableAndNonKeyedPartitionStreamImpl<T>
     public ProcessConfigurable<?> toSink(Sink<T> sink) {
         return stream.toSink(sink);
     }
+
+    public NonKeyedPartitionStreamImpl<T> getNonKeyedPartitionStream() {
+        return stream;
+    }
 }
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/ProcessConfigurableAndTwoKeyedPartitionStreamsImpl.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/ProcessConfigurableAndTwoKeyedPartitionStreamsImpl.java
new file mode 100644
index 00000000000..617b36255b5
--- /dev/null
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/ProcessConfigurableAndTwoKeyedPartitionStreamsImpl.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.impl.stream;
+
+import org.apache.flink.api.dag.Transformation;
+import 
org.apache.flink.datastream.api.stream.KeyedPartitionStream.ProcessConfigurableAndKeyedPartitionStream;
+import 
org.apache.flink.datastream.api.stream.KeyedPartitionStream.ProcessConfigurableAndTwoKeyedPartitionStreams;
+import org.apache.flink.datastream.impl.ExecutionEnvironmentImpl;
+import org.apache.flink.datastream.impl.utils.StreamUtils;
+
+/**
+ * {@link ProcessConfigurableAndTwoKeyedPartitionStreamsImpl} is used to hold 
the two output keyed
+ * streams and provide methods used for configuration.
+ */
+public class ProcessConfigurableAndTwoKeyedPartitionStreamsImpl<K, OUT1, OUT2>
+        extends ProcessConfigureHandle<
+                OUT1, ProcessConfigurableAndTwoKeyedPartitionStreams<K, OUT1, 
OUT2>>
+        implements ProcessConfigurableAndTwoKeyedPartitionStreams<K, OUT1, 
OUT2> {
+    private final KeyedPartitionStreamImpl<K, OUT1> firstStream;
+
+    private final KeyedPartitionStreamImpl<K, OUT2> secondStream;
+
+    public ProcessConfigurableAndTwoKeyedPartitionStreamsImpl(
+            ExecutionEnvironmentImpl environment,
+            Transformation<OUT1> transformation,
+            KeyedPartitionStreamImpl<K, OUT1> firstStream,
+            KeyedPartitionStreamImpl<K, OUT2> secondStream) {
+        super(environment, transformation);
+        this.firstStream = firstStream;
+        this.secondStream = secondStream;
+    }
+
+    @Override
+    public ProcessConfigurableAndKeyedPartitionStream<K, OUT1> getFirst() {
+        return StreamUtils.wrapWithConfigureHandle(firstStream);
+    }
+
+    @Override
+    public ProcessConfigurableAndKeyedPartitionStream<K, OUT2> getSecond() {
+        return StreamUtils.wrapWithConfigureHandle(secondStream);
+    }
+}
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/ProcessConfigurableAndTwoNonKeyedPartitionStreamImpl.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/ProcessConfigurableAndTwoNonKeyedPartitionStreamImpl.java
new file mode 100644
index 00000000000..dc880c70d2a
--- /dev/null
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/ProcessConfigurableAndTwoNonKeyedPartitionStreamImpl.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.impl.stream;
+
+import org.apache.flink.api.dag.Transformation;
+import 
org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.ProcessConfigurableAndNonKeyedPartitionStream;
+import 
org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.ProcessConfigurableAndTwoNonKeyedPartitionStream;
+import org.apache.flink.datastream.impl.ExecutionEnvironmentImpl;
+import org.apache.flink.datastream.impl.utils.StreamUtils;
+
+/**
+ * {@link ProcessConfigurableAndTwoNonKeyedPartitionStreamImpl} is used to 
hold the two output
+ * non-keyed streams and provide methods used for configuration.
+ */
+public class ProcessConfigurableAndTwoNonKeyedPartitionStreamImpl<OUT1, OUT2>
+        extends ProcessConfigureHandle<
+                OUT1, ProcessConfigurableAndTwoNonKeyedPartitionStream<OUT1, 
OUT2>>
+        implements ProcessConfigurableAndTwoNonKeyedPartitionStream<OUT1, 
OUT2> {
+
+    private final NonKeyedPartitionStreamImpl<OUT1> firstStream;
+
+    private final NonKeyedPartitionStreamImpl<OUT2> secondStream;
+
+    public ProcessConfigurableAndTwoNonKeyedPartitionStreamImpl(
+            ExecutionEnvironmentImpl environment,
+            Transformation<OUT1> transformation,
+            NonKeyedPartitionStreamImpl<OUT1> firstStream,
+            NonKeyedPartitionStreamImpl<OUT2> secondStream) {
+        super(environment, transformation);
+        this.firstStream = firstStream;
+        this.secondStream = secondStream;
+    }
+
+    @Override
+    public ProcessConfigurableAndNonKeyedPartitionStream<OUT1> getFirst() {
+        return StreamUtils.wrapWithConfigureHandle(firstStream);
+    }
+
+    @Override
+    public ProcessConfigurableAndNonKeyedPartitionStream<OUT2> getSecond() {
+        return StreamUtils.wrapWithConfigureHandle(secondStream);
+    }
+}
diff --git 
a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImplTest.java
 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImplTest.java
index db1d3ffeefe..967ea247240 100644
--- 
a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImplTest.java
+++ 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImplTest.java
@@ -113,12 +113,14 @@ public class KeyedPartitionStreamImplTest {
     void testProcessTwoOutput() throws Exception {
         ExecutionEnvironmentImpl env = StreamTestUtils.getEnv();
         KeyedPartitionStream<Integer, Integer> stream = createKeyedStream(env);
-        NonKeyedPartitionStream.TwoNonKeyedPartitionStreams<Integer, Long> 
resultStream1 =
-                stream.process(new NoOpTwoOutputStreamProcessFunction());
-        
assertThat(resultStream1.getFirst()).isInstanceOf(NonKeyedPartitionStream.class);
-        
assertThat(resultStream1.getSecond()).isInstanceOf(NonKeyedPartitionStream.class);
-        KeyedPartitionStream.TwoKeyedPartitionStreams<Integer, Integer, Long> 
resultStream2 =
-                stream.process(new NoOpTwoOutputStreamProcessFunction(), x -> 
x, Math::toIntExact);
+        
NonKeyedPartitionStream.ProcessConfigurableAndTwoNonKeyedPartitionStream<Integer,
 Long>
+                resultStream = stream.process(new 
NoOpTwoOutputStreamProcessFunction());
+        
assertThat(resultStream.getFirst()).isInstanceOf(NonKeyedPartitionStream.class);
+        
assertThat(resultStream.getSecond()).isInstanceOf(NonKeyedPartitionStream.class);
+        
KeyedPartitionStream.ProcessConfigurableAndTwoKeyedPartitionStreams<Integer, 
Integer, Long>
+                resultStream2 =
+                        stream.process(
+                                new NoOpTwoOutputStreamProcessFunction(), x -> 
x, Math::toIntExact);
         
assertThat(resultStream2.getFirst()).isInstanceOf(KeyedPartitionStream.class);
         
assertThat(resultStream2.getSecond()).isInstanceOf(KeyedPartitionStream.class);
         List<Transformation<?>> transformations = env.getTransformations();
diff --git 
a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/stream/NonKeyedPartitionStreamImplTest.java
 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/stream/NonKeyedPartitionStreamImplTest.java
index 4280f003b0d..6635327e22d 100644
--- 
a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/stream/NonKeyedPartitionStreamImplTest.java
+++ 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/stream/NonKeyedPartitionStreamImplTest.java
@@ -140,8 +140,9 @@ class NonKeyedPartitionStreamImplTest {
         NonKeyedPartitionStreamImpl<Integer> stream =
                 new NonKeyedPartitionStreamImpl<>(
                         env, new TestingTransformation<>("t1", Types.INT, 1));
-        NonKeyedPartitionStream.TwoNonKeyedPartitionStreams<Integer, Long> 
resultStream =
-                stream.process(new 
StreamTestUtils.NoOpTwoOutputStreamProcessFunction());
+        
NonKeyedPartitionStream.ProcessConfigurableAndTwoNonKeyedPartitionStream<Integer,
 Long>
+                resultStream =
+                        stream.process(new 
StreamTestUtils.NoOpTwoOutputStreamProcessFunction());
         
assertThat(resultStream.getFirst()).isInstanceOf(NonKeyedPartitionStream.class);
         
assertThat(resultStream.getSecond()).isInstanceOf(NonKeyedPartitionStream.class);
         List<Transformation<?>> transformations = env.getTransformations();

Reply via email to