This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 467791fea369a30a8334dacde49baedb7f1d7767 Author: codenohup <[email protected]> AuthorDate: Wed Sep 11 16:43:21 2024 +0800 [hotfix][API] DataStream should also support connectAndProcess with ProcessConfigurableStream Signed-off-by: codenohup <[email protected]> --- .../api/stream/KeyedPartitionStream.java | 13 ++--- .../api/stream/NonKeyedPartitionStream.java | 10 ++-- .../impl/stream/KeyedPartitionStreamImpl.java | 56 +++++++------------- .../impl/stream/NonKeyedPartitionStreamImpl.java | 42 +++------------ ...essConfigurableAndKeyedPartitionStreamImpl.java | 9 +++- ...ConfigurableAndNonKeyedPartitionStreamImpl.java | 6 ++- ...onfigurableAndTwoKeyedPartitionStreamsImpl.java | 58 +++++++++++++++++++++ ...figurableAndTwoNonKeyedPartitionStreamImpl.java | 59 ++++++++++++++++++++++ .../impl/stream/KeyedPartitionStreamImplTest.java | 14 ++--- .../stream/NonKeyedPartitionStreamImplTest.java | 5 +- 10 files changed, 178 insertions(+), 94 deletions(-) diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/KeyedPartitionStream.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/KeyedPartitionStream.java index b7a3de8e3c7..2ca8feca275 100644 --- a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/KeyedPartitionStream.java +++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/KeyedPartitionStream.java @@ -26,7 +26,7 @@ import org.apache.flink.datastream.api.function.TwoInputBroadcastStreamProcessFu import org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction; import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction; import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.ProcessConfigurableAndNonKeyedPartitionStream; -import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.TwoNonKeyedPartitionStreams; +import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.ProcessConfigurableAndTwoNonKeyedPartitionStream; /** * This interface represents a kind of partitioned data stream. For this stream, each key is a @@ -76,9 +76,9 @@ public interface KeyedPartitionStream<K, T> extends DataStream { * @param processFunction to perform two output operation. * @param keySelector1 to select the key of first output. * @param keySelector2 to select the key of second output. - * @return new {@link TwoKeyedPartitionStreams} with this operation. + * @return new {@link ProcessConfigurableAndTwoKeyedPartitionStreams} with this operation. */ - <OUT1, OUT2> TwoKeyedPartitionStreams<K, OUT1, OUT2> process( + <OUT1, OUT2> ProcessConfigurableAndTwoKeyedPartitionStreams<K, OUT1, OUT2> process( TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunction, KeySelector<OUT1, K> keySelector1, KeySelector<OUT2, K> keySelector2); @@ -87,9 +87,9 @@ public interface KeyedPartitionStream<K, T> extends DataStream { * Apply a two output operation to this {@link KeyedPartitionStream}. * * @param processFunction to perform two output operation. - * @return new {@link TwoNonKeyedPartitionStreams} with this operation. + * @return new {@link ProcessConfigurableAndTwoNonKeyedPartitionStream} with this operation. */ - <OUT1, OUT2> TwoNonKeyedPartitionStreams<OUT1, OUT2> process( + <OUT1, OUT2> ProcessConfigurableAndTwoNonKeyedPartitionStream<OUT1, OUT2> process( TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunction); /** @@ -209,7 +209,8 @@ public interface KeyedPartitionStream<K, T> extends DataStream { * the return value of operation with two output. */ @Experimental - interface TwoKeyedPartitionStreams<K, T1, T2> { + interface ProcessConfigurableAndTwoKeyedPartitionStreams<K, T1, T2> + extends ProcessConfigurable<ProcessConfigurableAndTwoKeyedPartitionStreams<K, T1, T2>> { /** Get the first stream. */ ProcessConfigurableAndKeyedPartitionStream<K, T1> getFirst(); diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/NonKeyedPartitionStream.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/NonKeyedPartitionStream.java index 5471c94e875..165071ce75b 100644 --- a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/NonKeyedPartitionStream.java +++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/NonKeyedPartitionStream.java @@ -47,7 +47,7 @@ public interface NonKeyedPartitionStream<T> extends DataStream { * @param processFunction to perform two output operation. * @return new stream with this operation. */ - <OUT1, OUT2> TwoNonKeyedPartitionStreams<OUT1, OUT2> process( + <OUT1, OUT2> ProcessConfigurableAndTwoNonKeyedPartitionStream<OUT1, OUT2> process( TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunction); /** @@ -114,11 +114,13 @@ public interface NonKeyedPartitionStream<T> extends DataStream { * used as the return value of operation with two output. */ @Experimental - interface TwoNonKeyedPartitionStreams<T1, T2> { + interface ProcessConfigurableAndTwoNonKeyedPartitionStream<OUT1, OUT2> + extends ProcessConfigurable< + ProcessConfigurableAndTwoNonKeyedPartitionStream<OUT1, OUT2>> { /** Get the first stream. */ - ProcessConfigurableAndNonKeyedPartitionStream<T1> getFirst(); + ProcessConfigurableAndNonKeyedPartitionStream<OUT1> getFirst(); /** Get the second stream. */ - ProcessConfigurableAndNonKeyedPartitionStream<T2> getSecond(); + ProcessConfigurableAndNonKeyedPartitionStream<OUT2> getSecond(); } } diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImpl.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImpl.java index 85b6c1531cd..d9650020478 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImpl.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImpl.java @@ -34,13 +34,12 @@ import org.apache.flink.datastream.api.stream.GlobalStream; import org.apache.flink.datastream.api.stream.KeyedPartitionStream; import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream; import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.ProcessConfigurableAndNonKeyedPartitionStream; -import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.TwoNonKeyedPartitionStreams; +import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.ProcessConfigurableAndTwoNonKeyedPartitionStream; import org.apache.flink.datastream.api.stream.ProcessConfigurable; import org.apache.flink.datastream.impl.operators.KeyedProcessOperator; import org.apache.flink.datastream.impl.operators.KeyedTwoInputBroadcastProcessOperator; import org.apache.flink.datastream.impl.operators.KeyedTwoInputNonBroadcastProcessOperator; import org.apache.flink.datastream.impl.operators.KeyedTwoOutputProcessOperator; -import org.apache.flink.datastream.impl.stream.NonKeyedPartitionStreamImpl.TwoNonKeyedPartitionStreamsImpl; import org.apache.flink.datastream.impl.utils.StreamUtils; import org.apache.flink.streaming.api.graph.StreamGraphGenerator; import org.apache.flink.streaming.api.transformations.DataStreamV2SinkTransformation; @@ -153,7 +152,7 @@ public class KeyedPartitionStreamImpl<K, V> extends AbstractDataStream<V> } @Override - public <OUT1, OUT2> TwoKeyedPartitionStreams<K, OUT1, OUT2> process( + public <OUT1, OUT2> ProcessConfigurableAndTwoKeyedPartitionStreams<K, OUT1, OUT2> process( TwoOutputStreamProcessFunction<V, OUT1, OUT2> processFunction, KeySelector<OUT1, K> keySelector1, KeySelector<OUT2, K> keySelector2) { @@ -202,11 +201,12 @@ public class KeyedPartitionStreamImpl<K, V> extends AbstractDataStream<V> TypeExtractor.getKeySelectorTypes( keySelector2, nonKeyedSideStream.getType())); environment.addOperator(mainOutputTransform); - return TwoKeyedPartitionStreamsImpl.of(keyedMainOutputStream, keyedSideOutputStream); + return new ProcessConfigurableAndTwoKeyedPartitionStreamsImpl<>( + environment, mainOutputTransform, keyedMainOutputStream, keyedSideOutputStream); } @Override - public <OUT1, OUT2> TwoNonKeyedPartitionStreams<OUT1, OUT2> process( + public <OUT1, OUT2> ProcessConfigurableAndTwoNonKeyedPartitionStream<OUT1, OUT2> process( TwoOutputStreamProcessFunction<V, OUT1, OUT2> processFunction) { validateStates( processFunction.usesStates(), @@ -235,7 +235,8 @@ public class KeyedPartitionStreamImpl<K, V> extends AbstractDataStream<V> new NonKeyedPartitionStreamImpl<>( environment, firstStream.getSideOutputTransform(secondOutputTag)); environment.addOperator(firstTransformation); - return TwoNonKeyedPartitionStreamsImpl.of(firstStream, secondStream); + return new ProcessConfigurableAndTwoNonKeyedPartitionStreamImpl<>( + environment, firstTransformation, firstStream, secondStream); } @Override @@ -246,7 +247,11 @@ public class KeyedPartitionStreamImpl<K, V> extends AbstractDataStream<V> processFunction.usesStates(), new HashSet<>( Collections.singletonList(StateDeclaration.RedistributionMode.IDENTICAL))); - + other = + other instanceof ProcessConfigurableAndKeyedPartitionStreamImpl + ? ((ProcessConfigurableAndKeyedPartitionStreamImpl) other) + .getKeyedPartitionStream() + : other; TypeInformation<OUT> outTypeInfo = StreamUtils.getOutputTypeForTwoInputNonBroadcastProcessFunction( processFunction, @@ -282,7 +287,11 @@ public class KeyedPartitionStreamImpl<K, V> extends AbstractDataStream<V> processFunction, getType(), ((KeyedPartitionStreamImpl<K, T_OTHER>) other).getType()); - + other = + other instanceof ProcessConfigurableAndKeyedPartitionStreamImpl + ? ((ProcessConfigurableAndKeyedPartitionStreamImpl) other) + .getKeyedPartitionStream() + : other; KeyedTwoInputNonBroadcastProcessOperator<K, V, T_OTHER, OUT> processOperator = new KeyedTwoInputNonBroadcastProcessOperator<>(processFunction, newKeySelector); Transformation<OUT> outTransformation = @@ -413,35 +422,4 @@ public class KeyedPartitionStreamImpl<K, V> extends AbstractDataStream<V> public BroadcastStream<V> broadcast() { return new BroadcastStreamImpl<>(environment, getTransformation()); } - - static class TwoKeyedPartitionStreamsImpl<K, OUT1, OUT2> - implements TwoKeyedPartitionStreams<K, OUT1, OUT2> { - - private final KeyedPartitionStreamImpl<K, OUT1> firstStream; - - private final KeyedPartitionStreamImpl<K, OUT2> secondStream; - - public static <K, OUT1, OUT2> TwoKeyedPartitionStreamsImpl<K, OUT1, OUT2> of( - KeyedPartitionStreamImpl<K, OUT1> firstStream, - KeyedPartitionStreamImpl<K, OUT2> secondStream) { - return new TwoKeyedPartitionStreamsImpl<>(firstStream, secondStream); - } - - private TwoKeyedPartitionStreamsImpl( - KeyedPartitionStreamImpl<K, OUT1> firstStream, - KeyedPartitionStreamImpl<K, OUT2> secondStream) { - this.firstStream = firstStream; - this.secondStream = secondStream; - } - - @Override - public ProcessConfigurableAndKeyedPartitionStream<K, OUT1> getFirst() { - return StreamUtils.wrapWithConfigureHandle(firstStream); - } - - @Override - public ProcessConfigurableAndKeyedPartitionStream<K, OUT2> getSecond() { - return StreamUtils.wrapWithConfigureHandle(secondStream); - } - } } diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/NonKeyedPartitionStreamImpl.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/NonKeyedPartitionStreamImpl.java index dda90d290be..82b81147f71 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/NonKeyedPartitionStreamImpl.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/NonKeyedPartitionStreamImpl.java @@ -81,7 +81,7 @@ public class NonKeyedPartitionStreamImpl<T> extends AbstractDataStream<T> } @Override - public <OUT1, OUT2> TwoNonKeyedPartitionStreams<OUT1, OUT2> process( + public <OUT1, OUT2> ProcessConfigurableAndTwoNonKeyedPartitionStream<OUT1, OUT2> process( TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunction) { validateStates( processFunction.usesStates(), @@ -107,7 +107,8 @@ public class NonKeyedPartitionStreamImpl<T> extends AbstractDataStream<T> new NonKeyedPartitionStreamImpl<>( environment, firstStream.getSideOutputTransform(secondOutputTag)); environment.addOperator(outTransformation); - return TwoNonKeyedPartitionStreamsImpl.of(firstStream, secondStream); + return new ProcessConfigurableAndTwoNonKeyedPartitionStreamImpl<>( + environment, outTransformation, firstStream, secondStream); } @Override @@ -120,7 +121,11 @@ public class NonKeyedPartitionStreamImpl<T> extends AbstractDataStream<T> Arrays.asList( StateDeclaration.RedistributionMode.NONE, StateDeclaration.RedistributionMode.IDENTICAL))); - + other = + other instanceof ProcessConfigurableAndNonKeyedPartitionStreamImpl + ? ((ProcessConfigurableAndNonKeyedPartitionStreamImpl) other) + .getNonKeyedPartitionStream() + : other; TypeInformation<OUT> outTypeInfo = StreamUtils.getOutputTypeForTwoInputNonBroadcastProcessFunction( processFunction, @@ -205,35 +210,4 @@ public class NonKeyedPartitionStreamImpl<T> extends AbstractDataStream<T> public BroadcastStream<T> broadcast() { return new BroadcastStreamImpl<>(environment, getTransformation()); } - - static class TwoNonKeyedPartitionStreamsImpl<OUT1, OUT2> - implements TwoNonKeyedPartitionStreams<OUT1, OUT2> { - - private final NonKeyedPartitionStreamImpl<OUT1> firstStream; - - private final NonKeyedPartitionStreamImpl<OUT2> secondStream; - - public static <OUT1, OUT2> TwoNonKeyedPartitionStreamsImpl<OUT1, OUT2> of( - NonKeyedPartitionStreamImpl<OUT1> firstStream, - NonKeyedPartitionStreamImpl<OUT2> secondStream) { - return new TwoNonKeyedPartitionStreamsImpl<>(firstStream, secondStream); - } - - private TwoNonKeyedPartitionStreamsImpl( - NonKeyedPartitionStreamImpl<OUT1> firstStream, - NonKeyedPartitionStreamImpl<OUT2> secondStream) { - this.firstStream = firstStream; - this.secondStream = secondStream; - } - - @Override - public ProcessConfigurableAndNonKeyedPartitionStream<OUT1> getFirst() { - return StreamUtils.wrapWithConfigureHandle(firstStream); - } - - @Override - public ProcessConfigurableAndNonKeyedPartitionStream<OUT2> getSecond() { - return StreamUtils.wrapWithConfigureHandle(secondStream); - } - } } diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/ProcessConfigurableAndKeyedPartitionStreamImpl.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/ProcessConfigurableAndKeyedPartitionStreamImpl.java index c2c11a4c911..ccaacb0cbb8 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/ProcessConfigurableAndKeyedPartitionStreamImpl.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/ProcessConfigurableAndKeyedPartitionStreamImpl.java @@ -30,6 +30,7 @@ import org.apache.flink.datastream.api.stream.KeyedPartitionStream; import org.apache.flink.datastream.api.stream.KeyedPartitionStream.ProcessConfigurableAndKeyedPartitionStream; import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream; import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.ProcessConfigurableAndNonKeyedPartitionStream; +import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.ProcessConfigurableAndTwoNonKeyedPartitionStream; import org.apache.flink.datastream.api.stream.ProcessConfigurable; /** @@ -60,7 +61,7 @@ public class ProcessConfigurableAndKeyedPartitionStreamImpl<K, T> } @Override - public <OUT1, OUT2> TwoKeyedPartitionStreams<K, OUT1, OUT2> process( + public <OUT1, OUT2> ProcessConfigurableAndTwoKeyedPartitionStreams<K, OUT1, OUT2> process( TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunction, KeySelector<OUT1, K> keySelector1, KeySelector<OUT2, K> keySelector2) { @@ -68,7 +69,7 @@ public class ProcessConfigurableAndKeyedPartitionStreamImpl<K, T> } @Override - public <OUT1, OUT2> NonKeyedPartitionStream.TwoNonKeyedPartitionStreams<OUT1, OUT2> process( + public <OUT1, OUT2> ProcessConfigurableAndTwoNonKeyedPartitionStream<OUT1, OUT2> process( TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunction) { return stream.process(processFunction); } @@ -127,4 +128,8 @@ public class ProcessConfigurableAndKeyedPartitionStreamImpl<K, T> public ProcessConfigurable<?> toSink(Sink<T> sink) { return stream.toSink(sink); } + + public KeyedPartitionStreamImpl<K, T> getKeyedPartitionStream() { + return stream; + } } diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/ProcessConfigurableAndNonKeyedPartitionStreamImpl.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/ProcessConfigurableAndNonKeyedPartitionStreamImpl.java index 2ed14e67ca7..82f958d7e0a 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/ProcessConfigurableAndNonKeyedPartitionStreamImpl.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/ProcessConfigurableAndNonKeyedPartitionStreamImpl.java @@ -53,7 +53,7 @@ public class ProcessConfigurableAndNonKeyedPartitionStreamImpl<T> } @Override - public <OUT1, OUT2> TwoNonKeyedPartitionStreams<OUT1, OUT2> process( + public <OUT1, OUT2> ProcessConfigurableAndTwoNonKeyedPartitionStream<OUT1, OUT2> process( TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunction) { return stream.process(processFunction); } @@ -96,4 +96,8 @@ public class ProcessConfigurableAndNonKeyedPartitionStreamImpl<T> public ProcessConfigurable<?> toSink(Sink<T> sink) { return stream.toSink(sink); } + + public NonKeyedPartitionStreamImpl<T> getNonKeyedPartitionStream() { + return stream; + } } diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/ProcessConfigurableAndTwoKeyedPartitionStreamsImpl.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/ProcessConfigurableAndTwoKeyedPartitionStreamsImpl.java new file mode 100644 index 00000000000..617b36255b5 --- /dev/null +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/ProcessConfigurableAndTwoKeyedPartitionStreamsImpl.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.impl.stream; + +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.datastream.api.stream.KeyedPartitionStream.ProcessConfigurableAndKeyedPartitionStream; +import org.apache.flink.datastream.api.stream.KeyedPartitionStream.ProcessConfigurableAndTwoKeyedPartitionStreams; +import org.apache.flink.datastream.impl.ExecutionEnvironmentImpl; +import org.apache.flink.datastream.impl.utils.StreamUtils; + +/** + * {@link ProcessConfigurableAndTwoKeyedPartitionStreamsImpl} is used to hold the two output keyed + * streams and provide methods used for configuration. + */ +public class ProcessConfigurableAndTwoKeyedPartitionStreamsImpl<K, OUT1, OUT2> + extends ProcessConfigureHandle< + OUT1, ProcessConfigurableAndTwoKeyedPartitionStreams<K, OUT1, OUT2>> + implements ProcessConfigurableAndTwoKeyedPartitionStreams<K, OUT1, OUT2> { + private final KeyedPartitionStreamImpl<K, OUT1> firstStream; + + private final KeyedPartitionStreamImpl<K, OUT2> secondStream; + + public ProcessConfigurableAndTwoKeyedPartitionStreamsImpl( + ExecutionEnvironmentImpl environment, + Transformation<OUT1> transformation, + KeyedPartitionStreamImpl<K, OUT1> firstStream, + KeyedPartitionStreamImpl<K, OUT2> secondStream) { + super(environment, transformation); + this.firstStream = firstStream; + this.secondStream = secondStream; + } + + @Override + public ProcessConfigurableAndKeyedPartitionStream<K, OUT1> getFirst() { + return StreamUtils.wrapWithConfigureHandle(firstStream); + } + + @Override + public ProcessConfigurableAndKeyedPartitionStream<K, OUT2> getSecond() { + return StreamUtils.wrapWithConfigureHandle(secondStream); + } +} diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/ProcessConfigurableAndTwoNonKeyedPartitionStreamImpl.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/ProcessConfigurableAndTwoNonKeyedPartitionStreamImpl.java new file mode 100644 index 00000000000..dc880c70d2a --- /dev/null +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/ProcessConfigurableAndTwoNonKeyedPartitionStreamImpl.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.impl.stream; + +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.ProcessConfigurableAndNonKeyedPartitionStream; +import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.ProcessConfigurableAndTwoNonKeyedPartitionStream; +import org.apache.flink.datastream.impl.ExecutionEnvironmentImpl; +import org.apache.flink.datastream.impl.utils.StreamUtils; + +/** + * {@link ProcessConfigurableAndTwoNonKeyedPartitionStreamImpl} is used to hold the two output + * non-keyed streams and provide methods used for configuration. + */ +public class ProcessConfigurableAndTwoNonKeyedPartitionStreamImpl<OUT1, OUT2> + extends ProcessConfigureHandle< + OUT1, ProcessConfigurableAndTwoNonKeyedPartitionStream<OUT1, OUT2>> + implements ProcessConfigurableAndTwoNonKeyedPartitionStream<OUT1, OUT2> { + + private final NonKeyedPartitionStreamImpl<OUT1> firstStream; + + private final NonKeyedPartitionStreamImpl<OUT2> secondStream; + + public ProcessConfigurableAndTwoNonKeyedPartitionStreamImpl( + ExecutionEnvironmentImpl environment, + Transformation<OUT1> transformation, + NonKeyedPartitionStreamImpl<OUT1> firstStream, + NonKeyedPartitionStreamImpl<OUT2> secondStream) { + super(environment, transformation); + this.firstStream = firstStream; + this.secondStream = secondStream; + } + + @Override + public ProcessConfigurableAndNonKeyedPartitionStream<OUT1> getFirst() { + return StreamUtils.wrapWithConfigureHandle(firstStream); + } + + @Override + public ProcessConfigurableAndNonKeyedPartitionStream<OUT2> getSecond() { + return StreamUtils.wrapWithConfigureHandle(secondStream); + } +} diff --git a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImplTest.java b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImplTest.java index db1d3ffeefe..967ea247240 100644 --- a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImplTest.java +++ b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImplTest.java @@ -113,12 +113,14 @@ public class KeyedPartitionStreamImplTest { void testProcessTwoOutput() throws Exception { ExecutionEnvironmentImpl env = StreamTestUtils.getEnv(); KeyedPartitionStream<Integer, Integer> stream = createKeyedStream(env); - NonKeyedPartitionStream.TwoNonKeyedPartitionStreams<Integer, Long> resultStream1 = - stream.process(new NoOpTwoOutputStreamProcessFunction()); - assertThat(resultStream1.getFirst()).isInstanceOf(NonKeyedPartitionStream.class); - assertThat(resultStream1.getSecond()).isInstanceOf(NonKeyedPartitionStream.class); - KeyedPartitionStream.TwoKeyedPartitionStreams<Integer, Integer, Long> resultStream2 = - stream.process(new NoOpTwoOutputStreamProcessFunction(), x -> x, Math::toIntExact); + NonKeyedPartitionStream.ProcessConfigurableAndTwoNonKeyedPartitionStream<Integer, Long> + resultStream = stream.process(new NoOpTwoOutputStreamProcessFunction()); + assertThat(resultStream.getFirst()).isInstanceOf(NonKeyedPartitionStream.class); + assertThat(resultStream.getSecond()).isInstanceOf(NonKeyedPartitionStream.class); + KeyedPartitionStream.ProcessConfigurableAndTwoKeyedPartitionStreams<Integer, Integer, Long> + resultStream2 = + stream.process( + new NoOpTwoOutputStreamProcessFunction(), x -> x, Math::toIntExact); assertThat(resultStream2.getFirst()).isInstanceOf(KeyedPartitionStream.class); assertThat(resultStream2.getSecond()).isInstanceOf(KeyedPartitionStream.class); List<Transformation<?>> transformations = env.getTransformations(); diff --git a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/stream/NonKeyedPartitionStreamImplTest.java b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/stream/NonKeyedPartitionStreamImplTest.java index 4280f003b0d..6635327e22d 100644 --- a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/stream/NonKeyedPartitionStreamImplTest.java +++ b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/stream/NonKeyedPartitionStreamImplTest.java @@ -140,8 +140,9 @@ class NonKeyedPartitionStreamImplTest { NonKeyedPartitionStreamImpl<Integer> stream = new NonKeyedPartitionStreamImpl<>( env, new TestingTransformation<>("t1", Types.INT, 1)); - NonKeyedPartitionStream.TwoNonKeyedPartitionStreams<Integer, Long> resultStream = - stream.process(new StreamTestUtils.NoOpTwoOutputStreamProcessFunction()); + NonKeyedPartitionStream.ProcessConfigurableAndTwoNonKeyedPartitionStream<Integer, Long> + resultStream = + stream.process(new StreamTestUtils.NoOpTwoOutputStreamProcessFunction()); assertThat(resultStream.getFirst()).isInstanceOf(NonKeyedPartitionStream.class); assertThat(resultStream.getSecond()).isInstanceOf(NonKeyedPartitionStream.class); List<Transformation<?>> transformations = env.getTransformations();
