This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 577d5caa9238c4f12c17fabe0c9285ffac01dbff
Author: Xu Huang <huangxu.wal...@gmail.com>
AuthorDate: Mon Jan 13 14:26:34 2025 +0800

    [hotfix] Add missing generic type param for the use of PartitionedContext
---
 .../api/function/ApplyPartitionFunction.java           |  2 +-
 .../api/function/OneInputStreamProcessFunction.java    |  6 ++++--
 .../TwoInputBroadcastStreamProcessFunction.java        |  5 +++--
 .../TwoInputNonBroadcastStreamProcessFunction.java     |  9 +++++----
 .../api/function/TwoOutputApplyPartitionFunction.java  |  2 +-
 .../api/function/TwoOutputStreamProcessFunction.java   |  4 ++--
 .../datastream/impl/ExecutionEnvironmentImplTest.java  |  2 +-
 .../StreamingJobGraphGeneratorWithAttributeTest.java   |  6 +++---
 .../datastream/impl/functions/ProcessFunctionTest.java | 18 +++++++++++++-----
 .../impl/operators/KeyedProcessOperatorTest.java       |  6 +++---
 .../KeyedTwoInputBroadcastProcessOperatorTest.java     |  6 +++---
 .../KeyedTwoInputNonBroadcastProcessOperatorTest.java  | 18 ++++++++++++------
 .../operators/KeyedTwoOutputProcessOperatorTest.java   |  6 +++---
 .../impl/operators/MockFreqCountProcessFunction.java   |  3 ++-
 .../MockGlobalDecuplicateCountProcessFunction.java     |  3 ++-
 .../MockGlobalListAppenderProcessFunction.java         |  3 ++-
 .../operators/MockListAppenderProcessFunction.java     |  3 ++-
 .../impl/operators/MockMultiplierProcessFunction.java  |  3 ++-
 .../MockRecudingMultiplierProcessFunction.java         |  3 ++-
 .../operators/MockSumAggregateProcessFunction.java     |  3 ++-
 .../datastream/impl/operators/ProcessOperatorTest.java |  2 +-
 .../TwoInputBroadcastProcessOperatorTest.java          |  4 ++--
 .../TwoInputNonBroadcastProcessOperatorTest.java       | 14 ++++++++++----
 .../impl/operators/TwoOutputProcessOperatorTest.java   |  4 ++--
 .../flink/datastream/impl/stream/StreamTestUtils.java  | 12 +++++++-----
 .../flink/datastream/impl/utils/StreamUtilsTest.java   | 14 +++++++++-----
 .../datastream/impl/utils/WatermarkUtilsTest.java      |  4 ++--
 .../api/datastream/StatefulDataStreamV2ITCase.java     | 18 ++++++++++++------
 .../test/streaming/api/datastream/WatermarkITCase.java |  6 +++---
 29 files changed, 116 insertions(+), 73 deletions(-)

diff --git 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/ApplyPartitionFunction.java
 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/ApplyPartitionFunction.java
index 6b86aafab1a..b5e1c8f478b 100644
--- 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/ApplyPartitionFunction.java
+++ 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/ApplyPartitionFunction.java
@@ -33,5 +33,5 @@ public interface ApplyPartitionFunction<OUT> extends Function 
{
      * @param collector to output data.
      * @param ctx runtime context in which this function is executed.
      */
-    void apply(Collector<OUT> collector, PartitionedContext ctx) throws 
Exception;
+    void apply(Collector<OUT> collector, PartitionedContext<OUT> ctx) throws 
Exception;
 }
diff --git 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/OneInputStreamProcessFunction.java
 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/OneInputStreamProcessFunction.java
index 5f3c694fa11..ad1a20454bf 100644
--- 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/OneInputStreamProcessFunction.java
+++ 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/OneInputStreamProcessFunction.java
@@ -47,7 +47,8 @@ public interface OneInputStreamProcessFunction<IN, OUT> 
extends ProcessFunction
      * @param output to emit processed records.
      * @param ctx runtime context in which this function is executed.
      */
-    void processRecord(IN record, Collector<OUT> output, PartitionedContext 
ctx) throws Exception;
+    void processRecord(IN record, Collector<OUT> output, 
PartitionedContext<OUT> ctx)
+            throws Exception;
 
     /**
      * This is a life-cycle method indicates that this function will no longer 
receive any data from
@@ -64,7 +65,8 @@ public interface OneInputStreamProcessFunction<IN, OUT> 
extends ProcessFunction
      * @param output to emit record.
      * @param ctx runtime context in which this function is executed.
      */
-    default void onProcessingTimer(long timestamp, Collector<OUT> output, 
PartitionedContext ctx) {}
+    default void onProcessingTimer(
+            long timestamp, Collector<OUT> output, PartitionedContext<OUT> 
ctx) {}
 
     /** Callback function when receive watermark. */
     default WatermarkHandlingResult onWatermark(
diff --git 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/TwoInputBroadcastStreamProcessFunction.java
 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/TwoInputBroadcastStreamProcessFunction.java
index 7e761c0d148..fc583dacacc 100644
--- 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/TwoInputBroadcastStreamProcessFunction.java
+++ 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/TwoInputBroadcastStreamProcessFunction.java
@@ -51,7 +51,7 @@ public interface TwoInputBroadcastStreamProcessFunction<IN1, 
IN2, OUT> extends P
      * @param ctx runtime context in which this function is executed.
      */
     void processRecordFromNonBroadcastInput(
-            IN1 record, Collector<OUT> output, PartitionedContext ctx) throws 
Exception;
+            IN1 record, Collector<OUT> output, PartitionedContext<OUT> ctx) 
throws Exception;
 
     /**
      * Process record from broadcast input. In general, the broadcast side is 
not allowed to
@@ -87,7 +87,8 @@ public interface TwoInputBroadcastStreamProcessFunction<IN1, 
IN2, OUT> extends P
      * @param output to emit record.
      * @param ctx runtime context in which this function is executed.
      */
-    default void onProcessingTimer(long timestamp, Collector<OUT> output, 
PartitionedContext ctx) {}
+    default void onProcessingTimer(
+            long timestamp, Collector<OUT> output, PartitionedContext<OUT> 
ctx) {}
 
     /**
      * Callback function when receive the watermark from broadcast input.
diff --git 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/TwoInputNonBroadcastStreamProcessFunction.java
 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/TwoInputNonBroadcastStreamProcessFunction.java
index 82eff4ddf88..356cc97e2fb 100644
--- 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/TwoInputNonBroadcastStreamProcessFunction.java
+++ 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/TwoInputNonBroadcastStreamProcessFunction.java
@@ -47,7 +47,7 @@ public interface 
TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT> extend
      * @param output to emit processed records.
      * @param ctx runtime context in which this function is executed.
      */
-    void processRecordFromFirstInput(IN1 record, Collector<OUT> output, 
PartitionedContext ctx)
+    void processRecordFromFirstInput(IN1 record, Collector<OUT> output, 
PartitionedContext<OUT> ctx)
             throws Exception;
 
     /**
@@ -57,8 +57,8 @@ public interface 
TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT> extend
      * @param output to emit processed records.
      * @param ctx runtime context in which this function is executed.
      */
-    void processRecordFromSecondInput(IN2 record, Collector<OUT> output, 
PartitionedContext ctx)
-            throws Exception;
+    void processRecordFromSecondInput(
+            IN2 record, Collector<OUT> output, PartitionedContext<OUT> ctx) 
throws Exception;
 
     /**
      * This is a life-cycle method indicates that this function will no longer 
receive any data from
@@ -83,7 +83,8 @@ public interface 
TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT> extend
      * @param output to emit record.
      * @param ctx runtime context in which this function is executed.
      */
-    default void onProcessingTimer(long timestamp, Collector<OUT> output, 
PartitionedContext ctx) {}
+    default void onProcessingTimer(
+            long timestamp, Collector<OUT> output, PartitionedContext<OUT> 
ctx) {}
 
     /**
      * Callback function when receive the watermark from the first input.
diff --git 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/TwoOutputApplyPartitionFunction.java
 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/TwoOutputApplyPartitionFunction.java
index 221e08ada41..8fefdadedae 100644
--- 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/TwoOutputApplyPartitionFunction.java
+++ 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/TwoOutputApplyPartitionFunction.java
@@ -37,6 +37,6 @@ public interface TwoOutputApplyPartitionFunction<OUT1, OUT2> 
extends Function {
     void apply(
             Collector<OUT1> firstOutput,
             Collector<OUT2> secondOutput,
-            TwoOutputPartitionedContext ctx)
+            TwoOutputPartitionedContext<OUT1, OUT2> ctx)
             throws Exception;
 }
diff --git 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/TwoOutputStreamProcessFunction.java
 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/TwoOutputStreamProcessFunction.java
index b2c91f8a4e3..1136ebc5053 100644
--- 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/TwoOutputStreamProcessFunction.java
+++ 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/TwoOutputStreamProcessFunction.java
@@ -52,7 +52,7 @@ public interface TwoOutputStreamProcessFunction<IN, OUT1, 
OUT2> extends ProcessF
             IN record,
             Collector<OUT1> output1,
             Collector<OUT2> output2,
-            TwoOutputPartitionedContext ctx)
+            TwoOutputPartitionedContext<OUT1, OUT2> ctx)
             throws Exception;
 
     /**
@@ -75,7 +75,7 @@ public interface TwoOutputStreamProcessFunction<IN, OUT1, 
OUT2> extends ProcessF
             long timestamp,
             Collector<OUT1> output1,
             Collector<OUT2> output2,
-            TwoOutputPartitionedContext ctx) {}
+            TwoOutputPartitionedContext<OUT1, OUT2> ctx) {}
 
     /**
      * Callback function when receive the watermark from the input.
diff --git 
a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/ExecutionEnvironmentImplTest.java
 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/ExecutionEnvironmentImplTest.java
index 4ab33e12df5..859f1c9637c 100644
--- 
a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/ExecutionEnvironmentImplTest.java
+++ 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/ExecutionEnvironmentImplTest.java
@@ -113,7 +113,7 @@ class ExecutionEnvironmentImplTest {
                 new OneInputStreamProcessFunction<Long, Long>() {
                     @Override
                     public void processRecord(
-                            Long record, Collector<Long> output, 
PartitionedContext ctx)
+                            Long record, Collector<Long> output, 
PartitionedContext<Long> ctx)
                             throws Exception {
                         // do nothing.
                     }
diff --git 
a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/attribute/StreamingJobGraphGeneratorWithAttributeTest.java
 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/attribute/StreamingJobGraphGeneratorWithAttributeTest.java
index 1574166530b..fffd5557a18 100644
--- 
a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/attribute/StreamingJobGraphGeneratorWithAttributeTest.java
+++ 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/attribute/StreamingJobGraphGeneratorWithAttributeTest.java
@@ -212,7 +212,7 @@ class StreamingJobGraphGeneratorWithAttributeTest {
 
         @Override
         public void processRecord(
-                Integer record, Collector<Integer> output, PartitionedContext 
ctx) {
+                Integer record, Collector<Integer> output, 
PartitionedContext<Integer> ctx) {
             output.collect(record + 1);
         }
     }
@@ -221,7 +221,7 @@ class StreamingJobGraphGeneratorWithAttributeTest {
 
         @Override
         public void processRecord(
-                Integer record, Collector<Integer> output, PartitionedContext 
ctx) {
+                Integer record, Collector<Integer> output, 
PartitionedContext<Integer> ctx) {
             if (record != 2) {
                 output.collect(record + 1);
             }
@@ -237,7 +237,7 @@ class StreamingJobGraphGeneratorWithAttributeTest {
                 Integer record,
                 Collector<Integer> output1,
                 Collector<Integer> output2,
-                TwoOutputPartitionedContext ctx) {
+                TwoOutputPartitionedContext<Integer, Integer> ctx) {
             output1.collect(record + 1);
             output2.collect(record - 1);
         }
diff --git 
a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/functions/ProcessFunctionTest.java
 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/functions/ProcessFunctionTest.java
index 600e03c26b0..423ecde16e8 100644
--- 
a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/functions/ProcessFunctionTest.java
+++ 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/functions/ProcessFunctionTest.java
@@ -59,7 +59,9 @@ public class ProcessFunctionTest {
 
                     @Override
                     public void processRecord(
-                            Integer record, Collector<Integer> output, 
PartitionedContext ctx)
+                            Integer record,
+                            Collector<Integer> output,
+                            PartitionedContext<Integer> ctx)
                             throws Exception {}
 
                     @Override
@@ -94,7 +96,9 @@ public class ProcessFunctionTest {
 
                     @Override
                     public void processRecordFromNonBroadcastInput(
-                            Integer record, Collector<Integer> output, 
PartitionedContext ctx)
+                            Integer record,
+                            Collector<Integer> output,
+                            PartitionedContext<Integer> ctx)
                             throws Exception {}
 
                     @Override
@@ -134,12 +138,16 @@ public class ProcessFunctionTest {
 
                     @Override
                     public void processRecordFromFirstInput(
-                            Integer record, Collector<Integer> output, 
PartitionedContext ctx)
+                            Integer record,
+                            Collector<Integer> output,
+                            PartitionedContext<Integer> ctx)
                             throws Exception {}
 
                     @Override
                     public void processRecordFromSecondInput(
-                            Integer record, Collector<Integer> output, 
PartitionedContext ctx)
+                            Integer record,
+                            Collector<Integer> output,
+                            PartitionedContext<Integer> ctx)
                             throws Exception {}
 
                     @Override
@@ -179,7 +187,7 @@ public class ProcessFunctionTest {
                             Integer record,
                             Collector<Integer> output1,
                             Collector<Integer> output2,
-                            TwoOutputPartitionedContext ctx)
+                            TwoOutputPartitionedContext<Integer, Integer> ctx)
                             throws Exception {}
 
                     @Override
diff --git 
a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/KeyedProcessOperatorTest.java
 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/KeyedProcessOperatorTest.java
index d744ba003a1..f3cd604c445 100644
--- 
a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/KeyedProcessOperatorTest.java
+++ 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/KeyedProcessOperatorTest.java
@@ -46,7 +46,7 @@ class KeyedProcessOperatorTest {
                             public void processRecord(
                                     Integer record,
                                     Collector<Integer> output,
-                                    PartitionedContext ctx) {
+                                    PartitionedContext<Integer> ctx) {
                                 output.collect(record + 1);
                             }
                         });
@@ -78,7 +78,7 @@ class KeyedProcessOperatorTest {
                             public void processRecord(
                                     Integer record,
                                     Collector<Integer> output,
-                                    PartitionedContext ctx) {
+                                    PartitionedContext<Integer> ctx) {
                                 // do nothing.
                             }
 
@@ -125,7 +125,7 @@ class KeyedProcessOperatorTest {
                             public void processRecord(
                                     Integer record,
                                     Collector<Integer> output,
-                                    PartitionedContext ctx) {
+                                    PartitionedContext<Integer> ctx) {
                                 // forward the record to check input key.
                                 output.collect(record);
                             }
diff --git 
a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputBroadcastProcessOperatorTest.java
 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputBroadcastProcessOperatorTest.java
index 7ad6070073b..c9e4b65276c 100644
--- 
a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputBroadcastProcessOperatorTest.java
+++ 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputBroadcastProcessOperatorTest.java
@@ -50,7 +50,7 @@ class KeyedTwoInputBroadcastProcessOperatorTest {
                             public void processRecordFromNonBroadcastInput(
                                     Integer record,
                                     Collector<Long> output,
-                                    PartitionedContext ctx) {
+                                    PartitionedContext<Long> ctx) {
                                 fromNonBroadcastInput.add(record);
                             }
 
@@ -89,7 +89,7 @@ class KeyedTwoInputBroadcastProcessOperatorTest {
                             public void processRecordFromNonBroadcastInput(
                                     Integer record,
                                     Collector<Long> output,
-                                    PartitionedContext ctx) {
+                                    PartitionedContext<Long> ctx) {
                                 //  do nothing.
                             }
 
@@ -161,7 +161,7 @@ class KeyedTwoInputBroadcastProcessOperatorTest {
                             public void processRecordFromNonBroadcastInput(
                                     Integer record,
                                     Collector<Long> output,
-                                    PartitionedContext ctx) {
+                                    PartitionedContext<Long> ctx) {
                                 output.collect(Long.valueOf(record));
                             }
 
diff --git 
a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputNonBroadcastProcessOperatorTest.java
 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputNonBroadcastProcessOperatorTest.java
index 75d9b616033..4ff5501768f 100644
--- 
a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputNonBroadcastProcessOperatorTest.java
+++ 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputNonBroadcastProcessOperatorTest.java
@@ -46,13 +46,15 @@ class KeyedTwoInputNonBroadcastProcessOperatorTest {
                             public void processRecordFromFirstInput(
                                     Integer record,
                                     Collector<Long> output,
-                                    PartitionedContext ctx) {
+                                    PartitionedContext<Long> ctx) {
                                 output.collect(Long.valueOf(record));
                             }
 
                             @Override
                             public void processRecordFromSecondInput(
-                                    Long record, Collector<Long> output, 
PartitionedContext ctx) {
+                                    Long record,
+                                    Collector<Long> output,
+                                    PartitionedContext<Long> ctx) {
                                 output.collect(record);
                             }
                         });
@@ -89,13 +91,15 @@ class KeyedTwoInputNonBroadcastProcessOperatorTest {
                             public void processRecordFromFirstInput(
                                     Integer record,
                                     Collector<Long> output,
-                                    PartitionedContext ctx) {
+                                    PartitionedContext<Long> ctx) {
                                 // do nothing.
                             }
 
                             @Override
                             public void processRecordFromSecondInput(
-                                    Long record, Collector<Long> output, 
PartitionedContext ctx) {
+                                    Long record,
+                                    Collector<Long> output,
+                                    PartitionedContext<Long> ctx) {
                                 // do nothing.
                             }
 
@@ -164,13 +168,15 @@ class KeyedTwoInputNonBroadcastProcessOperatorTest {
                             public void processRecordFromFirstInput(
                                     Integer record,
                                     Collector<Long> output,
-                                    PartitionedContext ctx) {
+                                    PartitionedContext<Long> ctx) {
                                 output.collect(Long.valueOf(record));
                             }
 
                             @Override
                             public void processRecordFromSecondInput(
-                                    Long record, Collector<Long> output, 
PartitionedContext ctx) {
+                                    Long record,
+                                    Collector<Long> output,
+                                    PartitionedContext<Long> ctx) {
                                 output.collect(record);
                             }
                         },
diff --git 
a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/KeyedTwoOutputProcessOperatorTest.java
 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/KeyedTwoOutputProcessOperatorTest.java
index 3de47a0c15d..7d0680325b9 100644
--- 
a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/KeyedTwoOutputProcessOperatorTest.java
+++ 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/KeyedTwoOutputProcessOperatorTest.java
@@ -52,7 +52,7 @@ class KeyedTwoOutputProcessOperatorTest {
                                     Integer record,
                                     Collector<Integer> output1,
                                     Collector<Long> output2,
-                                    TwoOutputPartitionedContext ctx) {
+                                    TwoOutputPartitionedContext<Integer, Long> 
ctx) {
                                 output1.collect(record);
                                 output2.collect((long) (record * 2));
                             }
@@ -93,7 +93,7 @@ class KeyedTwoOutputProcessOperatorTest {
                                     Integer record,
                                     Collector<Integer> output1,
                                     Collector<Long> output2,
-                                    TwoOutputPartitionedContext ctx) {
+                                    TwoOutputPartitionedContext<Integer, Long> 
ctx) {
                                 // do nothing.
                             }
 
@@ -147,7 +147,7 @@ class KeyedTwoOutputProcessOperatorTest {
                                     Integer record,
                                     Collector<Integer> output1,
                                     Collector<Long> output2,
-                                    TwoOutputPartitionedContext ctx) {
+                                    TwoOutputPartitionedContext<Integer, Long> 
ctx) {
                                 if (emitToFirstOutput.get()) {
                                     output1.collect(record);
                                 } else {
diff --git 
a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockFreqCountProcessFunction.java
 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockFreqCountProcessFunction.java
index 10ec3576c9d..dc4c527b9b7 100644
--- 
a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockFreqCountProcessFunction.java
+++ 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockFreqCountProcessFunction.java
@@ -44,7 +44,8 @@ public class MockFreqCountProcessFunction
     }
 
     @Override
-    public void processRecord(Integer record, Collector<Integer> output, 
PartitionedContext ctx)
+    public void processRecord(
+            Integer record, Collector<Integer> output, 
PartitionedContext<Integer> ctx)
             throws Exception {
         Optional<MapState<Integer, Integer>> stateOptional =
                 ctx.getStateManager().getState(mapStateDeclaration);
diff --git 
a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockGlobalDecuplicateCountProcessFunction.java
 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockGlobalDecuplicateCountProcessFunction.java
index aca147b3ae5..97aed0046cd 100644
--- 
a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockGlobalDecuplicateCountProcessFunction.java
+++ 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockGlobalDecuplicateCountProcessFunction.java
@@ -47,7 +47,8 @@ public class MockGlobalDecuplicateCountProcessFunction
     }
 
     @Override
-    public void processRecord(Integer record, Collector<Integer> output, 
PartitionedContext ctx)
+    public void processRecord(
+            Integer record, Collector<Integer> output, 
PartitionedContext<Integer> ctx)
             throws Exception {
         Optional<BroadcastState<Integer, Integer>> stateOptional =
                 ctx.getStateManager().getState(broadcastStateDeclaration);
diff --git 
a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockGlobalListAppenderProcessFunction.java
 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockGlobalListAppenderProcessFunction.java
index 53e214a9dcc..639b5d8cb8e 100644
--- 
a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockGlobalListAppenderProcessFunction.java
+++ 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockGlobalListAppenderProcessFunction.java
@@ -62,7 +62,8 @@ public class MockGlobalListAppenderProcessFunction
     }
 
     @Override
-    public void processRecord(Integer record, Collector<Integer> output, 
PartitionedContext ctx)
+    public void processRecord(
+            Integer record, Collector<Integer> output, 
PartitionedContext<Integer> ctx)
             throws Exception {
         Optional<ListState<Integer>> stateOptional =
                 ctx.getStateManager().getState(listStateDeclaration);
diff --git 
a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockListAppenderProcessFunction.java
 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockListAppenderProcessFunction.java
index 35c77576875..af89a50751a 100644
--- 
a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockListAppenderProcessFunction.java
+++ 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockListAppenderProcessFunction.java
@@ -61,7 +61,8 @@ public class MockListAppenderProcessFunction
     }
 
     @Override
-    public void processRecord(Integer record, Collector<Integer> output, 
PartitionedContext ctx)
+    public void processRecord(
+            Integer record, Collector<Integer> output, 
PartitionedContext<Integer> ctx)
             throws Exception {
         Optional<ListState<Integer>> stateOptional =
                 ctx.getStateManager().getState(listStateDeclaration);
diff --git 
a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockMultiplierProcessFunction.java
 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockMultiplierProcessFunction.java
index 20f8aadb7a4..0798461ec2c 100644
--- 
a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockMultiplierProcessFunction.java
+++ 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockMultiplierProcessFunction.java
@@ -44,7 +44,8 @@ public class MockMultiplierProcessFunction
     }
 
     @Override
-    public void processRecord(Integer record, Collector<Integer> output, 
PartitionedContext ctx)
+    public void processRecord(
+            Integer record, Collector<Integer> output, 
PartitionedContext<Integer> ctx)
             throws Exception {
         Optional<ValueState<Integer>> stateOptional =
                 ctx.getStateManager().getState(valueStateDeclaration);
diff --git 
a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockRecudingMultiplierProcessFunction.java
 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockRecudingMultiplierProcessFunction.java
index ca9d312b056..4aa22dcfc97 100644
--- 
a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockRecudingMultiplierProcessFunction.java
+++ 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockRecudingMultiplierProcessFunction.java
@@ -53,7 +53,8 @@ public class MockRecudingMultiplierProcessFunction
     }
 
     @Override
-    public void processRecord(Integer record, Collector<Integer> output, 
PartitionedContext ctx)
+    public void processRecord(
+            Integer record, Collector<Integer> output, 
PartitionedContext<Integer> ctx)
             throws Exception {
         Optional<ReducingState<Integer>> stateOptional =
                 ctx.getStateManager().getState(reducingStateDeclaration);
diff --git 
a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockSumAggregateProcessFunction.java
 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockSumAggregateProcessFunction.java
index 8a995dc010a..7b2661d02ea 100644
--- 
a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockSumAggregateProcessFunction.java
+++ 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockSumAggregateProcessFunction.java
@@ -69,7 +69,8 @@ public class MockSumAggregateProcessFunction
     }
 
     @Override
-    public void processRecord(Integer record, Collector<Integer> output, 
PartitionedContext ctx)
+    public void processRecord(
+            Integer record, Collector<Integer> output, 
PartitionedContext<Integer> ctx)
             throws Exception {
         Optional<AggregatingState<Integer, Integer>> stateOptional =
                 ctx.getStateManager().getState(aggregatingStateDeclaration);
diff --git 
a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/ProcessOperatorTest.java
 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/ProcessOperatorTest.java
index fe4419b14e7..5723c84575c 100644
--- 
a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/ProcessOperatorTest.java
+++ 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/ProcessOperatorTest.java
@@ -65,7 +65,7 @@ class ProcessOperatorTest {
                             public void processRecord(
                                     Integer record,
                                     Collector<String> output,
-                                    PartitionedContext ctx) {
+                                    PartitionedContext<String> ctx) {
                                 //  do nothing.
                             }
 
diff --git 
a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperatorTest.java
 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperatorTest.java
index 1e50d7ff864..a738814280b 100644
--- 
a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperatorTest.java
+++ 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperatorTest.java
@@ -48,7 +48,7 @@ class TwoInputBroadcastProcessOperatorTest {
                             public void processRecordFromNonBroadcastInput(
                                     Integer record,
                                     Collector<Long> output,
-                                    PartitionedContext ctx) {
+                                    PartitionedContext<Long> ctx) {
                                 
fromNonBroadcastInput.add(Long.valueOf(record));
                             }
 
@@ -84,7 +84,7 @@ class TwoInputBroadcastProcessOperatorTest {
                             public void processRecordFromNonBroadcastInput(
                                     Integer record,
                                     Collector<Long> output,
-                                    PartitionedContext ctx) {
+                                    PartitionedContext<Long> ctx) {
                                 // do nothing.
                             }
 
diff --git 
a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperatorTest.java
 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperatorTest.java
index e4774f3de8c..b05b0da43c2 100644
--- 
a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperatorTest.java
+++ 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperatorTest.java
@@ -43,13 +43,15 @@ class TwoInputNonBroadcastProcessOperatorTest {
                             public void processRecordFromFirstInput(
                                     Integer record,
                                     Collector<Long> output,
-                                    PartitionedContext ctx) {
+                                    PartitionedContext<Long> ctx) {
                                 output.collect(Long.valueOf(record));
                             }
 
                             @Override
                             public void processRecordFromSecondInput(
-                                    Long record, Collector<Long> output, 
PartitionedContext ctx) {
+                                    Long record,
+                                    Collector<Long> output,
+                                    PartitionedContext<Long> ctx) {
                                 output.collect(record);
                             }
                         });
@@ -82,14 +84,18 @@ class TwoInputNonBroadcastProcessOperatorTest {
                         new TwoInputNonBroadcastStreamProcessFunction<Integer, 
Long, Long>() {
                             @Override
                             public void processRecordFromFirstInput(
-                                    Integer record, Collector<Long> output, 
PartitionedContext ctx)
+                                    Integer record,
+                                    Collector<Long> output,
+                                    PartitionedContext<Long> ctx)
                                     throws Exception {
                                 // do nothing.
                             }
 
                             @Override
                             public void processRecordFromSecondInput(
-                                    Long record, Collector<Long> output, 
PartitionedContext ctx)
+                                    Long record,
+                                    Collector<Long> output,
+                                    PartitionedContext<Long> ctx)
                                     throws Exception {
                                 // do nothing.
                             }
diff --git 
a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperatorTest.java
 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperatorTest.java
index 7c182d90cc5..88f459d57e9 100644
--- 
a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperatorTest.java
+++ 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperatorTest.java
@@ -48,7 +48,7 @@ class TwoOutputProcessOperatorTest {
                                     Integer record,
                                     Collector<Integer> output1,
                                     Collector<Long> output2,
-                                    TwoOutputPartitionedContext ctx) {
+                                    TwoOutputPartitionedContext<Integer, Long> 
ctx) {
                                 output1.collect(record);
                                 output2.collect((long) (record * 2));
                             }
@@ -86,7 +86,7 @@ class TwoOutputProcessOperatorTest {
                                     Integer record,
                                     Collector<Integer> output1,
                                     Collector<Long> output2,
-                                    TwoOutputPartitionedContext ctx) {
+                                    TwoOutputPartitionedContext<Integer, Long> 
ctx) {
                                 // do nothing.
                             }
 
diff --git 
a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/stream/StreamTestUtils.java
 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/stream/StreamTestUtils.java
index 410a13c4c96..31d2d8cc07f 100644
--- 
a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/stream/StreamTestUtils.java
+++ 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/stream/StreamTestUtils.java
@@ -81,7 +81,8 @@ public final class StreamTestUtils {
         }
 
         @Override
-        public void processRecord(Integer record, Collector<Long> output, 
PartitionedContext ctx) {
+        public void processRecord(
+                Integer record, Collector<Long> output, 
PartitionedContext<Long> ctx) {
             // do nothing.
         }
     }
@@ -110,7 +111,7 @@ public final class StreamTestUtils {
                 Integer record,
                 Collector<Integer> output1,
                 Collector<Long> output2,
-                TwoOutputPartitionedContext ctx) {
+                TwoOutputPartitionedContext<Integer, Long> ctx) {
             //  do nothing.
         }
     }
@@ -139,13 +140,14 @@ public final class StreamTestUtils {
 
         @Override
         public void processRecordFromFirstInput(
-                Integer record, Collector<Long> output, PartitionedContext 
ctx) {
+                Integer record, Collector<Long> output, 
PartitionedContext<Long> ctx) {
             // do nothing.
         }
 
         @Override
         public void processRecordFromSecondInput(
-                Long record, Collector<Long> output, PartitionedContext ctx) 
throws Exception {
+                Long record, Collector<Long> output, PartitionedContext<Long> 
ctx)
+                throws Exception {
             // do nothing.
         }
     }
@@ -174,7 +176,7 @@ public final class StreamTestUtils {
 
         @Override
         public void processRecordFromNonBroadcastInput(
-                Long record, Collector<Long> output, PartitionedContext ctx) {
+                Long record, Collector<Long> output, PartitionedContext<Long> 
ctx) {
             // do nothing.
         }
 
diff --git 
a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/utils/StreamUtilsTest.java
 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/utils/StreamUtilsTest.java
index b1baa37d9e2..74c23dfb5b4 100644
--- 
a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/utils/StreamUtilsTest.java
+++ 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/utils/StreamUtilsTest.java
@@ -53,7 +53,9 @@ class StreamUtilsTest {
                         new OneInputStreamProcessFunction<Integer, Long>() {
                             @Override
                             public void processRecord(
-                                    Integer record, Collector<Long> output, 
PartitionedContext ctx)
+                                    Integer record,
+                                    Collector<Long> output,
+                                    PartitionedContext<Long> ctx)
                                     throws Exception {
                                 // ignore
                             }
@@ -83,14 +85,16 @@ class StreamUtilsTest {
                             public void processRecordFromFirstInput(
                                     Integer record,
                                     Collector<String> output,
-                                    PartitionedContext ctx)
+                                    PartitionedContext<String> ctx)
                                     throws Exception {
                                 // ignore
                             }
 
                             @Override
                             public void processRecordFromSecondInput(
-                                    Long record, Collector<String> output, 
PartitionedContext ctx)
+                                    Long record,
+                                    Collector<String> output,
+                                    PartitionedContext<String> ctx)
                                     throws Exception {
                                 // ignore
                             }
@@ -106,7 +110,7 @@ class StreamUtilsTest {
                             public void processRecordFromNonBroadcastInput(
                                     Integer record,
                                     Collector<String> output,
-                                    PartitionedContext ctx)
+                                    PartitionedContext<String> ctx)
                                     throws Exception {
                                 // ignore
                             }
@@ -133,7 +137,7 @@ class StreamUtilsTest {
                                     Integer record,
                                     Collector<Long> output1,
                                     Collector<String> output2,
-                                    TwoOutputPartitionedContext ctx)
+                                    TwoOutputPartitionedContext<Long, String> 
ctx)
                                     throws Exception {
                                 // ignore
                             }
diff --git 
a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/utils/WatermarkUtilsTest.java
 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/utils/WatermarkUtilsTest.java
index 22e2fe7e3cb..7285957f875 100644
--- 
a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/utils/WatermarkUtilsTest.java
+++ 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/utils/WatermarkUtilsTest.java
@@ -119,7 +119,7 @@ class WatermarkUtilsTest {
                             public void processRecord(
                                     Integer record,
                                     Collector<Integer> output,
-                                    PartitionedContext ctx)
+                                    PartitionedContext<Integer> ctx)
                                     throws Exception {}
 
                             @Override
@@ -137,7 +137,7 @@ class WatermarkUtilsTest {
                                             Integer record,
                                             Collector<Integer> output1,
                                             Collector<Integer> output2,
-                                            TwoOutputPartitionedContext ctx)
+                                            
TwoOutputPartitionedContext<Integer, Integer> ctx)
                                             throws Exception {}
 
                                     @Override
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/StatefulDataStreamV2ITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/StatefulDataStreamV2ITCase.java
index 646b7125d9b..51106835627 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/StatefulDataStreamV2ITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/StatefulDataStreamV2ITCase.java
@@ -166,7 +166,8 @@ class StatefulDataStreamV2ITCase {
         }
 
         @Override
-        public void processRecord(Long record, Collector<String> output, 
PartitionedContext ctx)
+        public void processRecord(
+                Long record, Collector<String> output, 
PartitionedContext<String> ctx)
                 throws Exception {
             Optional<AggregatingState<Long, Long>> maybeState =
                     ctx.getStateManager().getState(stateDeclaration);
@@ -191,7 +192,8 @@ class StatefulDataStreamV2ITCase {
         }
 
         @Override
-        public void processRecord(Long record, Collector<String> output, 
PartitionedContext ctx)
+        public void processRecord(
+                Long record, Collector<String> output, 
PartitionedContext<String> ctx)
                 throws Exception {
             Optional<ReducingState<Long>> maybeState =
                     ctx.getStateManager().getState(stateDeclaration);
@@ -219,7 +221,8 @@ class StatefulDataStreamV2ITCase {
         }
 
         @Override
-        public void processRecord(Long record, Collector<String> output, 
PartitionedContext ctx)
+        public void processRecord(
+                Long record, Collector<String> output, 
PartitionedContext<String> ctx)
                 throws Exception {
             Optional<MapState<Long, Long>> maybeState =
                     ctx.getStateManager().getState(stateDeclaration);
@@ -247,7 +250,8 @@ class StatefulDataStreamV2ITCase {
         }
 
         @Override
-        public void processRecord(Long record, Collector<String> output, 
PartitionedContext ctx)
+        public void processRecord(
+                Long record, Collector<String> output, 
PartitionedContext<String> ctx)
                 throws Exception {
             Optional<ListState<Long>> maybeState = 
ctx.getStateManager().getState(stateDeclaration);
             if (!maybeState.isPresent()) {
@@ -279,7 +283,8 @@ class StatefulDataStreamV2ITCase {
         }
 
         @Override
-        public void processRecord(Long record, Collector<String> output, 
PartitionedContext ctx)
+        public void processRecord(
+                Long record, Collector<String> output, 
PartitionedContext<String> ctx)
                 throws Exception {
             Optional<ValueState<Long>> maybeState =
                     ctx.getStateManager().getState(stateDeclaration);
@@ -308,7 +313,8 @@ class StatefulDataStreamV2ITCase {
         }
 
         @Override
-        public void processRecord(String record, Collector<Object> output, 
PartitionedContext ctx)
+        public void processRecord(
+                String record, Collector<Object> output, 
PartitionedContext<Object> ctx)
                 throws Exception {
             if (!allValues.contains(record)) {
                 throw new FlinkRuntimeException("Record not found: " + record);
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/WatermarkITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/WatermarkITCase.java
index 237af8bf894..e4a4db55991 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/WatermarkITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/WatermarkITCase.java
@@ -653,7 +653,7 @@ class WatermarkITCase {
         }
 
         @Override
-        public void processRecord(Long record, Collector<Long> output, 
PartitionedContext ctx)
+        public void processRecord(Long record, Collector<Long> output, 
PartitionedContext<Long> ctx)
                 throws Exception {
             receivedRecords.add(record);
             output.collect(record * 2);
@@ -722,7 +722,7 @@ class WatermarkITCase {
         }
 
         @Override
-        public void processRecord(Long record, Collector<Long> output, 
PartitionedContext ctx)
+        public void processRecord(Long record, Collector<Long> output, 
PartitionedContext<Long> ctx)
                 throws Exception {
             receivedRecords.add(record);
             output.collect(record + 1);
@@ -769,7 +769,7 @@ class WatermarkITCase {
         }
 
         @Override
-        public void processRecord(Long record, Collector<Long> output, 
PartitionedContext ctx)
+        public void processRecord(Long record, Collector<Long> output, 
PartitionedContext<Long> ctx)
                 throws Exception {
             receivedRecords.add(record);
         }

Reply via email to