zhuzhurk commented on code in PR #21621:
URL: https://github.com/apache/flink/pull/21621#discussion_r1070900001
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordProcessorUtils.java:
##########
@@ -86,6 +118,71 @@ public static <T> ThrowingConsumer<StreamRecord<T>,
Exception> getRecordProcesso
}
}
+ private static boolean canOmitSetKeyContext(
+ AbstractStreamOperator<?> streamOperator, int input) {
+ // Since AbstractStreamOperator is @PublicEvolving, we need to check
whether the
+ // "SetKeyContextElement" is overridden by the (user-implemented)
subclass. If it is
+ // overridden, we cannot omit it due to the subclass may maintain
different key selectors on
+ // its own.
+ return !hasKeyContext(streamOperator, input)
+ && !methodSetKeyContextIsOverride(streamOperator, input);
+ }
+
+ private static boolean hasKeyContext(AbstractStreamOperator<?> operator,
int input) {
+ if (input == 0) {
+ return operator.hasKeyContext1();
+ } else {
+ return operator.hasKeyContext2();
+ }
+ }
+
+ private static boolean methodSetKeyContextIsOverride(
+ AbstractStreamOperator<?> operator, int input) {
+ if (input == 0) {
+ if (operator instanceof OneInputStreamOperator) {
+ return methodIsOverride(
+ operator,
+ OneInputStreamOperator.class,
+ METHOD_SET_KEY_CONTEXT_ELEMENT,
+ StreamRecord.class)
+ || methodIsOverride(
+ operator,
+ AbstractStreamOperator.class,
+ METHOD_SET_KEY_CONTEXT_ELEMENT1,
+ StreamRecord.class);
+ } else {
+ return methodIsOverride(
+ operator,
+ AbstractStreamOperator.class,
+ METHOD_SET_KEY_CONTEXT_ELEMENT1,
+ StreamRecord.class);
+ }
+ } else {
+ return methodIsOverride(
+ operator,
+ AbstractStreamOperator.class,
+ METHOD_SET_KEY_CONTEXT_ELEMENT2,
+ StreamRecord.class);
+ }
+ }
+
+ private static boolean methodIsOverride(
Review Comment:
methodIsOverride -> methodIsOverridden
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordProcessorUtils.java:
##########
@@ -86,6 +118,71 @@ public static <T> ThrowingConsumer<StreamRecord<T>,
Exception> getRecordProcesso
}
}
+ private static boolean canOmitSetKeyContext(
+ AbstractStreamOperator<?> streamOperator, int input) {
+ // Since AbstractStreamOperator is @PublicEvolving, we need to check
whether the
+ // "SetKeyContextElement" is overridden by the (user-implemented)
subclass. If it is
+ // overridden, we cannot omit it due to the subclass may maintain
different key selectors on
+ // its own.
+ return !hasKeyContext(streamOperator, input)
+ && !methodSetKeyContextIsOverride(streamOperator, input);
+ }
+
+ private static boolean hasKeyContext(AbstractStreamOperator<?> operator,
int input) {
+ if (input == 0) {
+ return operator.hasKeyContext1();
+ } else {
+ return operator.hasKeyContext2();
+ }
+ }
+
+ private static boolean methodSetKeyContextIsOverride(
Review Comment:
methodSetKeyContextIsOverride -> methodSetKeyContextElementIsOverridden
##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/RecordProcessorUtilsTest.java:
##########
@@ -95,6 +100,163 @@ void testGetRecordProcessor2() throws Exception {
assertThat(operator3.processElement2Called).isTrue();
}
+ @Test
+ void testOverrideSetKeyContextElementForOneInputStreamOperator() throws
Exception {
+ // test no override
+ NoOverrideOneInputStreamOperator noOverride = new
NoOverrideOneInputStreamOperator();
+ RecordProcessorUtils.getRecordProcessor(noOverride).accept(new
StreamRecord<>("test"));
+ assertThat(noOverride.setCurrentKeyCalled).isFalse();
+
+ // test override "SetKeyContextElement"
+ OverrideSetKeyContextOneInputStreamOperator overrideSetKeyContext =
+ new OverrideSetKeyContextOneInputStreamOperator();
+ RecordProcessorUtils.getRecordProcessor(overrideSetKeyContext)
+ .accept(new StreamRecord<>("test"));
+ assertThat(overrideSetKeyContext.setKeyContextElementCalled).isTrue();
+
+ // test override "SetKeyContextElement1"
+ OverrideSetKeyContext1OneInputStreamOperator overrideSetKeyContext1 =
+ new OverrideSetKeyContext1OneInputStreamOperator();
+ RecordProcessorUtils.getRecordProcessor(overrideSetKeyContext1)
+ .accept(new StreamRecord<>("test"));
+
assertThat(overrideSetKeyContext1.setKeyContextElement1Called).isTrue();
+ }
+
+ @Test
+ void testOverrideSetKeyContextElementForTwoInputStreamOperator() throws
Exception {
+ // test no override
+ NoOverrideTwoInputStreamOperator noOverride = new
NoOverrideTwoInputStreamOperator();
+ RecordProcessorUtils.getRecordProcessor1(noOverride).accept(new
StreamRecord<>("test"));
+ RecordProcessorUtils.getRecordProcessor2(noOverride).accept(new
StreamRecord<>("test"));
+ assertThat(noOverride.setCurrentKeyCalled).isFalse();
+
+ // test override "SetKeyContextElement1" and "SetKeyContextElement2"
+ OverrideSetKeyContext1AndSetKeyContext2TwoInputStreamOperator override
=
+ new
OverrideSetKeyContext1AndSetKeyContext2TwoInputStreamOperator();
+ RecordProcessorUtils.getRecordProcessor1(override).accept(new
StreamRecord<>("test"));
+ RecordProcessorUtils.getRecordProcessor2(override).accept(new
StreamRecord<>("test"));
+ assertThat(override.setKeyContextElement1Called).isTrue();
+ assertThat(override.setKeyContextElement2Called).isTrue();
+ }
+
+ private static class NoOverrideOperator extends
AbstractStreamOperator<String> {
+
+ boolean setCurrentKeyCalled = false;
+
+ NoOverrideOperator() throws NoSuchFieldException,
IllegalAccessException {
+ super();
+ // For case that "SetKeyContextElement" has not been overridden,
+ // we can determine whether the "SetKeyContextElement" is called
through
+ // "setCurrentKey". According to the implementation, we need to
make the
+ // "stateKeySelector1/stateKeySelector2" not null. Besides, we
override the
+ // "hasKeyContext1" and "hasKeyContext2" to avoid
"stateKeySelector1/stateKeySelector2"
+ // from affecting the return value
Review Comment:
Is is possible to invoke `setup(...)` to initialize the key selectors in a
valid way?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]