This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 577d5caa9238c4f12c17fabe0c9285ffac01dbff Author: Xu Huang <huangxu.wal...@gmail.com> AuthorDate: Mon Jan 13 14:26:34 2025 +0800 [hotfix] Add missing generic type param for the use of PartitionedContext --- .../api/function/ApplyPartitionFunction.java | 2 +- .../api/function/OneInputStreamProcessFunction.java | 6 ++++-- .../TwoInputBroadcastStreamProcessFunction.java | 5 +++-- .../TwoInputNonBroadcastStreamProcessFunction.java | 9 +++++---- .../api/function/TwoOutputApplyPartitionFunction.java | 2 +- .../api/function/TwoOutputStreamProcessFunction.java | 4 ++-- .../datastream/impl/ExecutionEnvironmentImplTest.java | 2 +- .../StreamingJobGraphGeneratorWithAttributeTest.java | 6 +++--- .../datastream/impl/functions/ProcessFunctionTest.java | 18 +++++++++++++----- .../impl/operators/KeyedProcessOperatorTest.java | 6 +++--- .../KeyedTwoInputBroadcastProcessOperatorTest.java | 6 +++--- .../KeyedTwoInputNonBroadcastProcessOperatorTest.java | 18 ++++++++++++------ .../operators/KeyedTwoOutputProcessOperatorTest.java | 6 +++--- .../impl/operators/MockFreqCountProcessFunction.java | 3 ++- .../MockGlobalDecuplicateCountProcessFunction.java | 3 ++- .../MockGlobalListAppenderProcessFunction.java | 3 ++- .../operators/MockListAppenderProcessFunction.java | 3 ++- .../impl/operators/MockMultiplierProcessFunction.java | 3 ++- .../MockRecudingMultiplierProcessFunction.java | 3 ++- .../operators/MockSumAggregateProcessFunction.java | 3 ++- .../datastream/impl/operators/ProcessOperatorTest.java | 2 +- .../TwoInputBroadcastProcessOperatorTest.java | 4 ++-- .../TwoInputNonBroadcastProcessOperatorTest.java | 14 ++++++++++---- .../impl/operators/TwoOutputProcessOperatorTest.java | 4 ++-- .../flink/datastream/impl/stream/StreamTestUtils.java | 12 +++++++----- .../flink/datastream/impl/utils/StreamUtilsTest.java | 14 +++++++++----- .../datastream/impl/utils/WatermarkUtilsTest.java | 4 ++-- .../api/datastream/StatefulDataStreamV2ITCase.java | 18 ++++++++++++------ .../test/streaming/api/datastream/WatermarkITCase.java | 6 +++--- 29 files changed, 116 insertions(+), 73 deletions(-) diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/ApplyPartitionFunction.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/ApplyPartitionFunction.java index 6b86aafab1a..b5e1c8f478b 100644 --- a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/ApplyPartitionFunction.java +++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/ApplyPartitionFunction.java @@ -33,5 +33,5 @@ public interface ApplyPartitionFunction<OUT> extends Function { * @param collector to output data. * @param ctx runtime context in which this function is executed. */ - void apply(Collector<OUT> collector, PartitionedContext ctx) throws Exception; + void apply(Collector<OUT> collector, PartitionedContext<OUT> ctx) throws Exception; } diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/OneInputStreamProcessFunction.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/OneInputStreamProcessFunction.java index 5f3c694fa11..ad1a20454bf 100644 --- a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/OneInputStreamProcessFunction.java +++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/OneInputStreamProcessFunction.java @@ -47,7 +47,8 @@ public interface OneInputStreamProcessFunction<IN, OUT> extends ProcessFunction * @param output to emit processed records. * @param ctx runtime context in which this function is executed. */ - void processRecord(IN record, Collector<OUT> output, PartitionedContext ctx) throws Exception; + void processRecord(IN record, Collector<OUT> output, PartitionedContext<OUT> ctx) + throws Exception; /** * This is a life-cycle method indicates that this function will no longer receive any data from @@ -64,7 +65,8 @@ public interface OneInputStreamProcessFunction<IN, OUT> extends ProcessFunction * @param output to emit record. * @param ctx runtime context in which this function is executed. */ - default void onProcessingTimer(long timestamp, Collector<OUT> output, PartitionedContext ctx) {} + default void onProcessingTimer( + long timestamp, Collector<OUT> output, PartitionedContext<OUT> ctx) {} /** Callback function when receive watermark. */ default WatermarkHandlingResult onWatermark( diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/TwoInputBroadcastStreamProcessFunction.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/TwoInputBroadcastStreamProcessFunction.java index 7e761c0d148..fc583dacacc 100644 --- a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/TwoInputBroadcastStreamProcessFunction.java +++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/TwoInputBroadcastStreamProcessFunction.java @@ -51,7 +51,7 @@ public interface TwoInputBroadcastStreamProcessFunction<IN1, IN2, OUT> extends P * @param ctx runtime context in which this function is executed. */ void processRecordFromNonBroadcastInput( - IN1 record, Collector<OUT> output, PartitionedContext ctx) throws Exception; + IN1 record, Collector<OUT> output, PartitionedContext<OUT> ctx) throws Exception; /** * Process record from broadcast input. In general, the broadcast side is not allowed to @@ -87,7 +87,8 @@ public interface TwoInputBroadcastStreamProcessFunction<IN1, IN2, OUT> extends P * @param output to emit record. * @param ctx runtime context in which this function is executed. */ - default void onProcessingTimer(long timestamp, Collector<OUT> output, PartitionedContext ctx) {} + default void onProcessingTimer( + long timestamp, Collector<OUT> output, PartitionedContext<OUT> ctx) {} /** * Callback function when receive the watermark from broadcast input. diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/TwoInputNonBroadcastStreamProcessFunction.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/TwoInputNonBroadcastStreamProcessFunction.java index 82eff4ddf88..356cc97e2fb 100644 --- a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/TwoInputNonBroadcastStreamProcessFunction.java +++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/TwoInputNonBroadcastStreamProcessFunction.java @@ -47,7 +47,7 @@ public interface TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT> extend * @param output to emit processed records. * @param ctx runtime context in which this function is executed. */ - void processRecordFromFirstInput(IN1 record, Collector<OUT> output, PartitionedContext ctx) + void processRecordFromFirstInput(IN1 record, Collector<OUT> output, PartitionedContext<OUT> ctx) throws Exception; /** @@ -57,8 +57,8 @@ public interface TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT> extend * @param output to emit processed records. * @param ctx runtime context in which this function is executed. */ - void processRecordFromSecondInput(IN2 record, Collector<OUT> output, PartitionedContext ctx) - throws Exception; + void processRecordFromSecondInput( + IN2 record, Collector<OUT> output, PartitionedContext<OUT> ctx) throws Exception; /** * This is a life-cycle method indicates that this function will no longer receive any data from @@ -83,7 +83,8 @@ public interface TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT> extend * @param output to emit record. * @param ctx runtime context in which this function is executed. */ - default void onProcessingTimer(long timestamp, Collector<OUT> output, PartitionedContext ctx) {} + default void onProcessingTimer( + long timestamp, Collector<OUT> output, PartitionedContext<OUT> ctx) {} /** * Callback function when receive the watermark from the first input. diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/TwoOutputApplyPartitionFunction.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/TwoOutputApplyPartitionFunction.java index 221e08ada41..8fefdadedae 100644 --- a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/TwoOutputApplyPartitionFunction.java +++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/TwoOutputApplyPartitionFunction.java @@ -37,6 +37,6 @@ public interface TwoOutputApplyPartitionFunction<OUT1, OUT2> extends Function { void apply( Collector<OUT1> firstOutput, Collector<OUT2> secondOutput, - TwoOutputPartitionedContext ctx) + TwoOutputPartitionedContext<OUT1, OUT2> ctx) throws Exception; } diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/TwoOutputStreamProcessFunction.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/TwoOutputStreamProcessFunction.java index b2c91f8a4e3..1136ebc5053 100644 --- a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/TwoOutputStreamProcessFunction.java +++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/TwoOutputStreamProcessFunction.java @@ -52,7 +52,7 @@ public interface TwoOutputStreamProcessFunction<IN, OUT1, OUT2> extends ProcessF IN record, Collector<OUT1> output1, Collector<OUT2> output2, - TwoOutputPartitionedContext ctx) + TwoOutputPartitionedContext<OUT1, OUT2> ctx) throws Exception; /** @@ -75,7 +75,7 @@ public interface TwoOutputStreamProcessFunction<IN, OUT1, OUT2> extends ProcessF long timestamp, Collector<OUT1> output1, Collector<OUT2> output2, - TwoOutputPartitionedContext ctx) {} + TwoOutputPartitionedContext<OUT1, OUT2> ctx) {} /** * Callback function when receive the watermark from the input. diff --git a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/ExecutionEnvironmentImplTest.java b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/ExecutionEnvironmentImplTest.java index 4ab33e12df5..859f1c9637c 100644 --- a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/ExecutionEnvironmentImplTest.java +++ b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/ExecutionEnvironmentImplTest.java @@ -113,7 +113,7 @@ class ExecutionEnvironmentImplTest { new OneInputStreamProcessFunction<Long, Long>() { @Override public void processRecord( - Long record, Collector<Long> output, PartitionedContext ctx) + Long record, Collector<Long> output, PartitionedContext<Long> ctx) throws Exception { // do nothing. } diff --git a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/attribute/StreamingJobGraphGeneratorWithAttributeTest.java b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/attribute/StreamingJobGraphGeneratorWithAttributeTest.java index 1574166530b..fffd5557a18 100644 --- a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/attribute/StreamingJobGraphGeneratorWithAttributeTest.java +++ b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/attribute/StreamingJobGraphGeneratorWithAttributeTest.java @@ -212,7 +212,7 @@ class StreamingJobGraphGeneratorWithAttributeTest { @Override public void processRecord( - Integer record, Collector<Integer> output, PartitionedContext ctx) { + Integer record, Collector<Integer> output, PartitionedContext<Integer> ctx) { output.collect(record + 1); } } @@ -221,7 +221,7 @@ class StreamingJobGraphGeneratorWithAttributeTest { @Override public void processRecord( - Integer record, Collector<Integer> output, PartitionedContext ctx) { + Integer record, Collector<Integer> output, PartitionedContext<Integer> ctx) { if (record != 2) { output.collect(record + 1); } @@ -237,7 +237,7 @@ class StreamingJobGraphGeneratorWithAttributeTest { Integer record, Collector<Integer> output1, Collector<Integer> output2, - TwoOutputPartitionedContext ctx) { + TwoOutputPartitionedContext<Integer, Integer> ctx) { output1.collect(record + 1); output2.collect(record - 1); } diff --git a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/functions/ProcessFunctionTest.java b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/functions/ProcessFunctionTest.java index 600e03c26b0..423ecde16e8 100644 --- a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/functions/ProcessFunctionTest.java +++ b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/functions/ProcessFunctionTest.java @@ -59,7 +59,9 @@ public class ProcessFunctionTest { @Override public void processRecord( - Integer record, Collector<Integer> output, PartitionedContext ctx) + Integer record, + Collector<Integer> output, + PartitionedContext<Integer> ctx) throws Exception {} @Override @@ -94,7 +96,9 @@ public class ProcessFunctionTest { @Override public void processRecordFromNonBroadcastInput( - Integer record, Collector<Integer> output, PartitionedContext ctx) + Integer record, + Collector<Integer> output, + PartitionedContext<Integer> ctx) throws Exception {} @Override @@ -134,12 +138,16 @@ public class ProcessFunctionTest { @Override public void processRecordFromFirstInput( - Integer record, Collector<Integer> output, PartitionedContext ctx) + Integer record, + Collector<Integer> output, + PartitionedContext<Integer> ctx) throws Exception {} @Override public void processRecordFromSecondInput( - Integer record, Collector<Integer> output, PartitionedContext ctx) + Integer record, + Collector<Integer> output, + PartitionedContext<Integer> ctx) throws Exception {} @Override @@ -179,7 +187,7 @@ public class ProcessFunctionTest { Integer record, Collector<Integer> output1, Collector<Integer> output2, - TwoOutputPartitionedContext ctx) + TwoOutputPartitionedContext<Integer, Integer> ctx) throws Exception {} @Override diff --git a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/KeyedProcessOperatorTest.java b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/KeyedProcessOperatorTest.java index d744ba003a1..f3cd604c445 100644 --- a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/KeyedProcessOperatorTest.java +++ b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/KeyedProcessOperatorTest.java @@ -46,7 +46,7 @@ class KeyedProcessOperatorTest { public void processRecord( Integer record, Collector<Integer> output, - PartitionedContext ctx) { + PartitionedContext<Integer> ctx) { output.collect(record + 1); } }); @@ -78,7 +78,7 @@ class KeyedProcessOperatorTest { public void processRecord( Integer record, Collector<Integer> output, - PartitionedContext ctx) { + PartitionedContext<Integer> ctx) { // do nothing. } @@ -125,7 +125,7 @@ class KeyedProcessOperatorTest { public void processRecord( Integer record, Collector<Integer> output, - PartitionedContext ctx) { + PartitionedContext<Integer> ctx) { // forward the record to check input key. output.collect(record); } diff --git a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputBroadcastProcessOperatorTest.java b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputBroadcastProcessOperatorTest.java index 7ad6070073b..c9e4b65276c 100644 --- a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputBroadcastProcessOperatorTest.java +++ b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputBroadcastProcessOperatorTest.java @@ -50,7 +50,7 @@ class KeyedTwoInputBroadcastProcessOperatorTest { public void processRecordFromNonBroadcastInput( Integer record, Collector<Long> output, - PartitionedContext ctx) { + PartitionedContext<Long> ctx) { fromNonBroadcastInput.add(record); } @@ -89,7 +89,7 @@ class KeyedTwoInputBroadcastProcessOperatorTest { public void processRecordFromNonBroadcastInput( Integer record, Collector<Long> output, - PartitionedContext ctx) { + PartitionedContext<Long> ctx) { // do nothing. } @@ -161,7 +161,7 @@ class KeyedTwoInputBroadcastProcessOperatorTest { public void processRecordFromNonBroadcastInput( Integer record, Collector<Long> output, - PartitionedContext ctx) { + PartitionedContext<Long> ctx) { output.collect(Long.valueOf(record)); } diff --git a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputNonBroadcastProcessOperatorTest.java b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputNonBroadcastProcessOperatorTest.java index 75d9b616033..4ff5501768f 100644 --- a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputNonBroadcastProcessOperatorTest.java +++ b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputNonBroadcastProcessOperatorTest.java @@ -46,13 +46,15 @@ class KeyedTwoInputNonBroadcastProcessOperatorTest { public void processRecordFromFirstInput( Integer record, Collector<Long> output, - PartitionedContext ctx) { + PartitionedContext<Long> ctx) { output.collect(Long.valueOf(record)); } @Override public void processRecordFromSecondInput( - Long record, Collector<Long> output, PartitionedContext ctx) { + Long record, + Collector<Long> output, + PartitionedContext<Long> ctx) { output.collect(record); } }); @@ -89,13 +91,15 @@ class KeyedTwoInputNonBroadcastProcessOperatorTest { public void processRecordFromFirstInput( Integer record, Collector<Long> output, - PartitionedContext ctx) { + PartitionedContext<Long> ctx) { // do nothing. } @Override public void processRecordFromSecondInput( - Long record, Collector<Long> output, PartitionedContext ctx) { + Long record, + Collector<Long> output, + PartitionedContext<Long> ctx) { // do nothing. } @@ -164,13 +168,15 @@ class KeyedTwoInputNonBroadcastProcessOperatorTest { public void processRecordFromFirstInput( Integer record, Collector<Long> output, - PartitionedContext ctx) { + PartitionedContext<Long> ctx) { output.collect(Long.valueOf(record)); } @Override public void processRecordFromSecondInput( - Long record, Collector<Long> output, PartitionedContext ctx) { + Long record, + Collector<Long> output, + PartitionedContext<Long> ctx) { output.collect(record); } }, diff --git a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/KeyedTwoOutputProcessOperatorTest.java b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/KeyedTwoOutputProcessOperatorTest.java index 3de47a0c15d..7d0680325b9 100644 --- a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/KeyedTwoOutputProcessOperatorTest.java +++ b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/KeyedTwoOutputProcessOperatorTest.java @@ -52,7 +52,7 @@ class KeyedTwoOutputProcessOperatorTest { Integer record, Collector<Integer> output1, Collector<Long> output2, - TwoOutputPartitionedContext ctx) { + TwoOutputPartitionedContext<Integer, Long> ctx) { output1.collect(record); output2.collect((long) (record * 2)); } @@ -93,7 +93,7 @@ class KeyedTwoOutputProcessOperatorTest { Integer record, Collector<Integer> output1, Collector<Long> output2, - TwoOutputPartitionedContext ctx) { + TwoOutputPartitionedContext<Integer, Long> ctx) { // do nothing. } @@ -147,7 +147,7 @@ class KeyedTwoOutputProcessOperatorTest { Integer record, Collector<Integer> output1, Collector<Long> output2, - TwoOutputPartitionedContext ctx) { + TwoOutputPartitionedContext<Integer, Long> ctx) { if (emitToFirstOutput.get()) { output1.collect(record); } else { diff --git a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockFreqCountProcessFunction.java b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockFreqCountProcessFunction.java index 10ec3576c9d..dc4c527b9b7 100644 --- a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockFreqCountProcessFunction.java +++ b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockFreqCountProcessFunction.java @@ -44,7 +44,8 @@ public class MockFreqCountProcessFunction } @Override - public void processRecord(Integer record, Collector<Integer> output, PartitionedContext ctx) + public void processRecord( + Integer record, Collector<Integer> output, PartitionedContext<Integer> ctx) throws Exception { Optional<MapState<Integer, Integer>> stateOptional = ctx.getStateManager().getState(mapStateDeclaration); diff --git a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockGlobalDecuplicateCountProcessFunction.java b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockGlobalDecuplicateCountProcessFunction.java index aca147b3ae5..97aed0046cd 100644 --- a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockGlobalDecuplicateCountProcessFunction.java +++ b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockGlobalDecuplicateCountProcessFunction.java @@ -47,7 +47,8 @@ public class MockGlobalDecuplicateCountProcessFunction } @Override - public void processRecord(Integer record, Collector<Integer> output, PartitionedContext ctx) + public void processRecord( + Integer record, Collector<Integer> output, PartitionedContext<Integer> ctx) throws Exception { Optional<BroadcastState<Integer, Integer>> stateOptional = ctx.getStateManager().getState(broadcastStateDeclaration); diff --git a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockGlobalListAppenderProcessFunction.java b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockGlobalListAppenderProcessFunction.java index 53e214a9dcc..639b5d8cb8e 100644 --- a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockGlobalListAppenderProcessFunction.java +++ b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockGlobalListAppenderProcessFunction.java @@ -62,7 +62,8 @@ public class MockGlobalListAppenderProcessFunction } @Override - public void processRecord(Integer record, Collector<Integer> output, PartitionedContext ctx) + public void processRecord( + Integer record, Collector<Integer> output, PartitionedContext<Integer> ctx) throws Exception { Optional<ListState<Integer>> stateOptional = ctx.getStateManager().getState(listStateDeclaration); diff --git a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockListAppenderProcessFunction.java b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockListAppenderProcessFunction.java index 35c77576875..af89a50751a 100644 --- a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockListAppenderProcessFunction.java +++ b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockListAppenderProcessFunction.java @@ -61,7 +61,8 @@ public class MockListAppenderProcessFunction } @Override - public void processRecord(Integer record, Collector<Integer> output, PartitionedContext ctx) + public void processRecord( + Integer record, Collector<Integer> output, PartitionedContext<Integer> ctx) throws Exception { Optional<ListState<Integer>> stateOptional = ctx.getStateManager().getState(listStateDeclaration); diff --git a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockMultiplierProcessFunction.java b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockMultiplierProcessFunction.java index 20f8aadb7a4..0798461ec2c 100644 --- a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockMultiplierProcessFunction.java +++ b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockMultiplierProcessFunction.java @@ -44,7 +44,8 @@ public class MockMultiplierProcessFunction } @Override - public void processRecord(Integer record, Collector<Integer> output, PartitionedContext ctx) + public void processRecord( + Integer record, Collector<Integer> output, PartitionedContext<Integer> ctx) throws Exception { Optional<ValueState<Integer>> stateOptional = ctx.getStateManager().getState(valueStateDeclaration); diff --git a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockRecudingMultiplierProcessFunction.java b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockRecudingMultiplierProcessFunction.java index ca9d312b056..4aa22dcfc97 100644 --- a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockRecudingMultiplierProcessFunction.java +++ b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockRecudingMultiplierProcessFunction.java @@ -53,7 +53,8 @@ public class MockRecudingMultiplierProcessFunction } @Override - public void processRecord(Integer record, Collector<Integer> output, PartitionedContext ctx) + public void processRecord( + Integer record, Collector<Integer> output, PartitionedContext<Integer> ctx) throws Exception { Optional<ReducingState<Integer>> stateOptional = ctx.getStateManager().getState(reducingStateDeclaration); diff --git a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockSumAggregateProcessFunction.java b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockSumAggregateProcessFunction.java index 8a995dc010a..7b2661d02ea 100644 --- a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockSumAggregateProcessFunction.java +++ b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockSumAggregateProcessFunction.java @@ -69,7 +69,8 @@ public class MockSumAggregateProcessFunction } @Override - public void processRecord(Integer record, Collector<Integer> output, PartitionedContext ctx) + public void processRecord( + Integer record, Collector<Integer> output, PartitionedContext<Integer> ctx) throws Exception { Optional<AggregatingState<Integer, Integer>> stateOptional = ctx.getStateManager().getState(aggregatingStateDeclaration); diff --git a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/ProcessOperatorTest.java b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/ProcessOperatorTest.java index fe4419b14e7..5723c84575c 100644 --- a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/ProcessOperatorTest.java +++ b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/ProcessOperatorTest.java @@ -65,7 +65,7 @@ class ProcessOperatorTest { public void processRecord( Integer record, Collector<String> output, - PartitionedContext ctx) { + PartitionedContext<String> ctx) { // do nothing. } diff --git a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperatorTest.java b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperatorTest.java index 1e50d7ff864..a738814280b 100644 --- a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperatorTest.java +++ b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperatorTest.java @@ -48,7 +48,7 @@ class TwoInputBroadcastProcessOperatorTest { public void processRecordFromNonBroadcastInput( Integer record, Collector<Long> output, - PartitionedContext ctx) { + PartitionedContext<Long> ctx) { fromNonBroadcastInput.add(Long.valueOf(record)); } @@ -84,7 +84,7 @@ class TwoInputBroadcastProcessOperatorTest { public void processRecordFromNonBroadcastInput( Integer record, Collector<Long> output, - PartitionedContext ctx) { + PartitionedContext<Long> ctx) { // do nothing. } diff --git a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperatorTest.java b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperatorTest.java index e4774f3de8c..b05b0da43c2 100644 --- a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperatorTest.java +++ b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperatorTest.java @@ -43,13 +43,15 @@ class TwoInputNonBroadcastProcessOperatorTest { public void processRecordFromFirstInput( Integer record, Collector<Long> output, - PartitionedContext ctx) { + PartitionedContext<Long> ctx) { output.collect(Long.valueOf(record)); } @Override public void processRecordFromSecondInput( - Long record, Collector<Long> output, PartitionedContext ctx) { + Long record, + Collector<Long> output, + PartitionedContext<Long> ctx) { output.collect(record); } }); @@ -82,14 +84,18 @@ class TwoInputNonBroadcastProcessOperatorTest { new TwoInputNonBroadcastStreamProcessFunction<Integer, Long, Long>() { @Override public void processRecordFromFirstInput( - Integer record, Collector<Long> output, PartitionedContext ctx) + Integer record, + Collector<Long> output, + PartitionedContext<Long> ctx) throws Exception { // do nothing. } @Override public void processRecordFromSecondInput( - Long record, Collector<Long> output, PartitionedContext ctx) + Long record, + Collector<Long> output, + PartitionedContext<Long> ctx) throws Exception { // do nothing. } diff --git a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperatorTest.java b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperatorTest.java index 7c182d90cc5..88f459d57e9 100644 --- a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperatorTest.java +++ b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperatorTest.java @@ -48,7 +48,7 @@ class TwoOutputProcessOperatorTest { Integer record, Collector<Integer> output1, Collector<Long> output2, - TwoOutputPartitionedContext ctx) { + TwoOutputPartitionedContext<Integer, Long> ctx) { output1.collect(record); output2.collect((long) (record * 2)); } @@ -86,7 +86,7 @@ class TwoOutputProcessOperatorTest { Integer record, Collector<Integer> output1, Collector<Long> output2, - TwoOutputPartitionedContext ctx) { + TwoOutputPartitionedContext<Integer, Long> ctx) { // do nothing. } diff --git a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/stream/StreamTestUtils.java b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/stream/StreamTestUtils.java index 410a13c4c96..31d2d8cc07f 100644 --- a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/stream/StreamTestUtils.java +++ b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/stream/StreamTestUtils.java @@ -81,7 +81,8 @@ public final class StreamTestUtils { } @Override - public void processRecord(Integer record, Collector<Long> output, PartitionedContext ctx) { + public void processRecord( + Integer record, Collector<Long> output, PartitionedContext<Long> ctx) { // do nothing. } } @@ -110,7 +111,7 @@ public final class StreamTestUtils { Integer record, Collector<Integer> output1, Collector<Long> output2, - TwoOutputPartitionedContext ctx) { + TwoOutputPartitionedContext<Integer, Long> ctx) { // do nothing. } } @@ -139,13 +140,14 @@ public final class StreamTestUtils { @Override public void processRecordFromFirstInput( - Integer record, Collector<Long> output, PartitionedContext ctx) { + Integer record, Collector<Long> output, PartitionedContext<Long> ctx) { // do nothing. } @Override public void processRecordFromSecondInput( - Long record, Collector<Long> output, PartitionedContext ctx) throws Exception { + Long record, Collector<Long> output, PartitionedContext<Long> ctx) + throws Exception { // do nothing. } } @@ -174,7 +176,7 @@ public final class StreamTestUtils { @Override public void processRecordFromNonBroadcastInput( - Long record, Collector<Long> output, PartitionedContext ctx) { + Long record, Collector<Long> output, PartitionedContext<Long> ctx) { // do nothing. } diff --git a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/utils/StreamUtilsTest.java b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/utils/StreamUtilsTest.java index b1baa37d9e2..74c23dfb5b4 100644 --- a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/utils/StreamUtilsTest.java +++ b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/utils/StreamUtilsTest.java @@ -53,7 +53,9 @@ class StreamUtilsTest { new OneInputStreamProcessFunction<Integer, Long>() { @Override public void processRecord( - Integer record, Collector<Long> output, PartitionedContext ctx) + Integer record, + Collector<Long> output, + PartitionedContext<Long> ctx) throws Exception { // ignore } @@ -83,14 +85,16 @@ class StreamUtilsTest { public void processRecordFromFirstInput( Integer record, Collector<String> output, - PartitionedContext ctx) + PartitionedContext<String> ctx) throws Exception { // ignore } @Override public void processRecordFromSecondInput( - Long record, Collector<String> output, PartitionedContext ctx) + Long record, + Collector<String> output, + PartitionedContext<String> ctx) throws Exception { // ignore } @@ -106,7 +110,7 @@ class StreamUtilsTest { public void processRecordFromNonBroadcastInput( Integer record, Collector<String> output, - PartitionedContext ctx) + PartitionedContext<String> ctx) throws Exception { // ignore } @@ -133,7 +137,7 @@ class StreamUtilsTest { Integer record, Collector<Long> output1, Collector<String> output2, - TwoOutputPartitionedContext ctx) + TwoOutputPartitionedContext<Long, String> ctx) throws Exception { // ignore } diff --git a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/utils/WatermarkUtilsTest.java b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/utils/WatermarkUtilsTest.java index 22e2fe7e3cb..7285957f875 100644 --- a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/utils/WatermarkUtilsTest.java +++ b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/utils/WatermarkUtilsTest.java @@ -119,7 +119,7 @@ class WatermarkUtilsTest { public void processRecord( Integer record, Collector<Integer> output, - PartitionedContext ctx) + PartitionedContext<Integer> ctx) throws Exception {} @Override @@ -137,7 +137,7 @@ class WatermarkUtilsTest { Integer record, Collector<Integer> output1, Collector<Integer> output2, - TwoOutputPartitionedContext ctx) + TwoOutputPartitionedContext<Integer, Integer> ctx) throws Exception {} @Override diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/StatefulDataStreamV2ITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/StatefulDataStreamV2ITCase.java index 646b7125d9b..51106835627 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/StatefulDataStreamV2ITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/StatefulDataStreamV2ITCase.java @@ -166,7 +166,8 @@ class StatefulDataStreamV2ITCase { } @Override - public void processRecord(Long record, Collector<String> output, PartitionedContext ctx) + public void processRecord( + Long record, Collector<String> output, PartitionedContext<String> ctx) throws Exception { Optional<AggregatingState<Long, Long>> maybeState = ctx.getStateManager().getState(stateDeclaration); @@ -191,7 +192,8 @@ class StatefulDataStreamV2ITCase { } @Override - public void processRecord(Long record, Collector<String> output, PartitionedContext ctx) + public void processRecord( + Long record, Collector<String> output, PartitionedContext<String> ctx) throws Exception { Optional<ReducingState<Long>> maybeState = ctx.getStateManager().getState(stateDeclaration); @@ -219,7 +221,8 @@ class StatefulDataStreamV2ITCase { } @Override - public void processRecord(Long record, Collector<String> output, PartitionedContext ctx) + public void processRecord( + Long record, Collector<String> output, PartitionedContext<String> ctx) throws Exception { Optional<MapState<Long, Long>> maybeState = ctx.getStateManager().getState(stateDeclaration); @@ -247,7 +250,8 @@ class StatefulDataStreamV2ITCase { } @Override - public void processRecord(Long record, Collector<String> output, PartitionedContext ctx) + public void processRecord( + Long record, Collector<String> output, PartitionedContext<String> ctx) throws Exception { Optional<ListState<Long>> maybeState = ctx.getStateManager().getState(stateDeclaration); if (!maybeState.isPresent()) { @@ -279,7 +283,8 @@ class StatefulDataStreamV2ITCase { } @Override - public void processRecord(Long record, Collector<String> output, PartitionedContext ctx) + public void processRecord( + Long record, Collector<String> output, PartitionedContext<String> ctx) throws Exception { Optional<ValueState<Long>> maybeState = ctx.getStateManager().getState(stateDeclaration); @@ -308,7 +313,8 @@ class StatefulDataStreamV2ITCase { } @Override - public void processRecord(String record, Collector<Object> output, PartitionedContext ctx) + public void processRecord( + String record, Collector<Object> output, PartitionedContext<Object> ctx) throws Exception { if (!allValues.contains(record)) { throw new FlinkRuntimeException("Record not found: " + record); diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/WatermarkITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/WatermarkITCase.java index 237af8bf894..e4a4db55991 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/WatermarkITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/WatermarkITCase.java @@ -653,7 +653,7 @@ class WatermarkITCase { } @Override - public void processRecord(Long record, Collector<Long> output, PartitionedContext ctx) + public void processRecord(Long record, Collector<Long> output, PartitionedContext<Long> ctx) throws Exception { receivedRecords.add(record); output.collect(record * 2); @@ -722,7 +722,7 @@ class WatermarkITCase { } @Override - public void processRecord(Long record, Collector<Long> output, PartitionedContext ctx) + public void processRecord(Long record, Collector<Long> output, PartitionedContext<Long> ctx) throws Exception { receivedRecords.add(record); output.collect(record + 1); @@ -769,7 +769,7 @@ class WatermarkITCase { } @Override - public void processRecord(Long record, Collector<Long> output, PartitionedContext ctx) + public void processRecord(Long record, Collector<Long> output, PartitionedContext<Long> ctx) throws Exception { receivedRecords.add(record); }