wanglijie95 commented on code in PR #21621:
URL: https://github.com/apache/flink/pull/21621#discussion_r1070990867


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/RecordProcessorUtilsTest.java:
##########
@@ -95,6 +100,163 @@ void testGetRecordProcessor2() throws Exception {
         assertThat(operator3.processElement2Called).isTrue();
     }
 
+    @Test
+    void testOverrideSetKeyContextElementForOneInputStreamOperator() throws 
Exception {
+        // test no override
+        NoOverrideOneInputStreamOperator noOverride = new 
NoOverrideOneInputStreamOperator();
+        RecordProcessorUtils.getRecordProcessor(noOverride).accept(new 
StreamRecord<>("test"));
+        assertThat(noOverride.setCurrentKeyCalled).isFalse();
+
+        // test override "SetKeyContextElement"
+        OverrideSetKeyContextOneInputStreamOperator overrideSetKeyContext =
+                new OverrideSetKeyContextOneInputStreamOperator();
+        RecordProcessorUtils.getRecordProcessor(overrideSetKeyContext)
+                .accept(new StreamRecord<>("test"));
+        assertThat(overrideSetKeyContext.setKeyContextElementCalled).isTrue();
+
+        // test override "SetKeyContextElement1"
+        OverrideSetKeyContext1OneInputStreamOperator overrideSetKeyContext1 =
+                new OverrideSetKeyContext1OneInputStreamOperator();
+        RecordProcessorUtils.getRecordProcessor(overrideSetKeyContext1)
+                .accept(new StreamRecord<>("test"));
+        
assertThat(overrideSetKeyContext1.setKeyContextElement1Called).isTrue();
+    }
+
+    @Test
+    void testOverrideSetKeyContextElementForTwoInputStreamOperator() throws 
Exception {
+        // test no override
+        NoOverrideTwoInputStreamOperator noOverride = new 
NoOverrideTwoInputStreamOperator();
+        RecordProcessorUtils.getRecordProcessor1(noOverride).accept(new 
StreamRecord<>("test"));
+        RecordProcessorUtils.getRecordProcessor2(noOverride).accept(new 
StreamRecord<>("test"));
+        assertThat(noOverride.setCurrentKeyCalled).isFalse();
+
+        // test override "SetKeyContextElement1" and "SetKeyContextElement2"
+        OverrideSetKeyContext1AndSetKeyContext2TwoInputStreamOperator override 
=
+                new 
OverrideSetKeyContext1AndSetKeyContext2TwoInputStreamOperator();
+        RecordProcessorUtils.getRecordProcessor1(override).accept(new 
StreamRecord<>("test"));
+        RecordProcessorUtils.getRecordProcessor2(override).accept(new 
StreamRecord<>("test"));
+        assertThat(override.setKeyContextElement1Called).isTrue();
+        assertThat(override.setKeyContextElement2Called).isTrue();
+    }
+
+    private static class NoOverrideOperator extends 
AbstractStreamOperator<String> {
+
+        boolean setCurrentKeyCalled = false;
+
+        NoOverrideOperator() throws NoSuchFieldException, 
IllegalAccessException {
+            super();
+            // For case that "SetKeyContextElement" has not been overridden,
+            // we can determine whether the "SetKeyContextElement" is called 
through
+            // "setCurrentKey". According to the implementation, we need to 
make the
+            // "stateKeySelector1/stateKeySelector2" not null. Besides, we 
override the
+            // "hasKeyContext1" and "hasKeyContext2" to avoid 
"stateKeySelector1/stateKeySelector2"
+            // from affecting the return value

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to