zhuzhurk commented on code in PR #21621:
URL: https://github.com/apache/flink/pull/21621#discussion_r1070900001


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordProcessorUtils.java:
##########
@@ -86,6 +118,71 @@ public static <T> ThrowingConsumer<StreamRecord<T>, 
Exception> getRecordProcesso
         }
     }
 
+    private static boolean canOmitSetKeyContext(
+            AbstractStreamOperator<?> streamOperator, int input) {
+        // Since AbstractStreamOperator is @PublicEvolving, we need to check 
whether the
+        // "SetKeyContextElement" is overridden by the (user-implemented) 
subclass. If it is
+        // overridden, we cannot omit it due to the subclass may maintain 
different key selectors on
+        // its own.
+        return !hasKeyContext(streamOperator, input)
+                && !methodSetKeyContextIsOverride(streamOperator, input);
+    }
+
+    private static boolean hasKeyContext(AbstractStreamOperator<?> operator, 
int input) {
+        if (input == 0) {
+            return operator.hasKeyContext1();
+        } else {
+            return operator.hasKeyContext2();
+        }
+    }
+
+    private static boolean methodSetKeyContextIsOverride(
+            AbstractStreamOperator<?> operator, int input) {
+        if (input == 0) {
+            if (operator instanceof OneInputStreamOperator) {
+                return methodIsOverride(
+                                operator,
+                                OneInputStreamOperator.class,
+                                METHOD_SET_KEY_CONTEXT_ELEMENT,
+                                StreamRecord.class)
+                        || methodIsOverride(
+                                operator,
+                                AbstractStreamOperator.class,
+                                METHOD_SET_KEY_CONTEXT_ELEMENT1,
+                                StreamRecord.class);
+            } else {
+                return methodIsOverride(
+                        operator,
+                        AbstractStreamOperator.class,
+                        METHOD_SET_KEY_CONTEXT_ELEMENT1,
+                        StreamRecord.class);
+            }
+        } else {
+            return methodIsOverride(
+                    operator,
+                    AbstractStreamOperator.class,
+                    METHOD_SET_KEY_CONTEXT_ELEMENT2,
+                    StreamRecord.class);
+        }
+    }
+
+    private static boolean methodIsOverride(

Review Comment:
   methodIsOverride -> methodIsOverridden



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordProcessorUtils.java:
##########
@@ -86,6 +118,71 @@ public static <T> ThrowingConsumer<StreamRecord<T>, 
Exception> getRecordProcesso
         }
     }
 
+    private static boolean canOmitSetKeyContext(
+            AbstractStreamOperator<?> streamOperator, int input) {
+        // Since AbstractStreamOperator is @PublicEvolving, we need to check 
whether the
+        // "SetKeyContextElement" is overridden by the (user-implemented) 
subclass. If it is
+        // overridden, we cannot omit it due to the subclass may maintain 
different key selectors on
+        // its own.
+        return !hasKeyContext(streamOperator, input)
+                && !methodSetKeyContextIsOverride(streamOperator, input);
+    }
+
+    private static boolean hasKeyContext(AbstractStreamOperator<?> operator, 
int input) {
+        if (input == 0) {
+            return operator.hasKeyContext1();
+        } else {
+            return operator.hasKeyContext2();
+        }
+    }
+
+    private static boolean methodSetKeyContextIsOverride(

Review Comment:
   methodSetKeyContextIsOverride -> methodSetKeyContextElementIsOverridden



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/RecordProcessorUtilsTest.java:
##########
@@ -95,6 +100,163 @@ void testGetRecordProcessor2() throws Exception {
         assertThat(operator3.processElement2Called).isTrue();
     }
 
+    @Test
+    void testOverrideSetKeyContextElementForOneInputStreamOperator() throws 
Exception {
+        // test no override
+        NoOverrideOneInputStreamOperator noOverride = new 
NoOverrideOneInputStreamOperator();
+        RecordProcessorUtils.getRecordProcessor(noOverride).accept(new 
StreamRecord<>("test"));
+        assertThat(noOverride.setCurrentKeyCalled).isFalse();
+
+        // test override "SetKeyContextElement"
+        OverrideSetKeyContextOneInputStreamOperator overrideSetKeyContext =
+                new OverrideSetKeyContextOneInputStreamOperator();
+        RecordProcessorUtils.getRecordProcessor(overrideSetKeyContext)
+                .accept(new StreamRecord<>("test"));
+        assertThat(overrideSetKeyContext.setKeyContextElementCalled).isTrue();
+
+        // test override "SetKeyContextElement1"
+        OverrideSetKeyContext1OneInputStreamOperator overrideSetKeyContext1 =
+                new OverrideSetKeyContext1OneInputStreamOperator();
+        RecordProcessorUtils.getRecordProcessor(overrideSetKeyContext1)
+                .accept(new StreamRecord<>("test"));
+        
assertThat(overrideSetKeyContext1.setKeyContextElement1Called).isTrue();
+    }
+
+    @Test
+    void testOverrideSetKeyContextElementForTwoInputStreamOperator() throws 
Exception {
+        // test no override
+        NoOverrideTwoInputStreamOperator noOverride = new 
NoOverrideTwoInputStreamOperator();
+        RecordProcessorUtils.getRecordProcessor1(noOverride).accept(new 
StreamRecord<>("test"));
+        RecordProcessorUtils.getRecordProcessor2(noOverride).accept(new 
StreamRecord<>("test"));
+        assertThat(noOverride.setCurrentKeyCalled).isFalse();
+
+        // test override "SetKeyContextElement1" and "SetKeyContextElement2"
+        OverrideSetKeyContext1AndSetKeyContext2TwoInputStreamOperator override 
=
+                new 
OverrideSetKeyContext1AndSetKeyContext2TwoInputStreamOperator();
+        RecordProcessorUtils.getRecordProcessor1(override).accept(new 
StreamRecord<>("test"));
+        RecordProcessorUtils.getRecordProcessor2(override).accept(new 
StreamRecord<>("test"));
+        assertThat(override.setKeyContextElement1Called).isTrue();
+        assertThat(override.setKeyContextElement2Called).isTrue();
+    }
+
+    private static class NoOverrideOperator extends 
AbstractStreamOperator<String> {
+
+        boolean setCurrentKeyCalled = false;
+
+        NoOverrideOperator() throws NoSuchFieldException, 
IllegalAccessException {
+            super();
+            // For case that "SetKeyContextElement" has not been overridden,
+            // we can determine whether the "SetKeyContextElement" is called 
through
+            // "setCurrentKey". According to the implementation, we need to 
make the
+            // "stateKeySelector1/stateKeySelector2" not null. Besides, we 
override the
+            // "hasKeyContext1" and "hasKeyContext2" to avoid 
"stateKeySelector1/stateKeySelector2"
+            // from affecting the return value

Review Comment:
   Is is possible to invoke `setup(...)` to initialize the key selectors in a 
valid way?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to