wanglijie95 commented on code in PR #21621:
URL: https://github.com/apache/flink/pull/21621#discussion_r1070990867
##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/RecordProcessorUtilsTest.java:
##########
@@ -95,6 +100,163 @@ void testGetRecordProcessor2() throws Exception {
assertThat(operator3.processElement2Called).isTrue();
}
+ @Test
+ void testOverrideSetKeyContextElementForOneInputStreamOperator() throws
Exception {
+ // test no override
+ NoOverrideOneInputStreamOperator noOverride = new
NoOverrideOneInputStreamOperator();
+ RecordProcessorUtils.getRecordProcessor(noOverride).accept(new
StreamRecord<>("test"));
+ assertThat(noOverride.setCurrentKeyCalled).isFalse();
+
+ // test override "SetKeyContextElement"
+ OverrideSetKeyContextOneInputStreamOperator overrideSetKeyContext =
+ new OverrideSetKeyContextOneInputStreamOperator();
+ RecordProcessorUtils.getRecordProcessor(overrideSetKeyContext)
+ .accept(new StreamRecord<>("test"));
+ assertThat(overrideSetKeyContext.setKeyContextElementCalled).isTrue();
+
+ // test override "SetKeyContextElement1"
+ OverrideSetKeyContext1OneInputStreamOperator overrideSetKeyContext1 =
+ new OverrideSetKeyContext1OneInputStreamOperator();
+ RecordProcessorUtils.getRecordProcessor(overrideSetKeyContext1)
+ .accept(new StreamRecord<>("test"));
+
assertThat(overrideSetKeyContext1.setKeyContextElement1Called).isTrue();
+ }
+
+ @Test
+ void testOverrideSetKeyContextElementForTwoInputStreamOperator() throws
Exception {
+ // test no override
+ NoOverrideTwoInputStreamOperator noOverride = new
NoOverrideTwoInputStreamOperator();
+ RecordProcessorUtils.getRecordProcessor1(noOverride).accept(new
StreamRecord<>("test"));
+ RecordProcessorUtils.getRecordProcessor2(noOverride).accept(new
StreamRecord<>("test"));
+ assertThat(noOverride.setCurrentKeyCalled).isFalse();
+
+ // test override "SetKeyContextElement1" and "SetKeyContextElement2"
+ OverrideSetKeyContext1AndSetKeyContext2TwoInputStreamOperator override
=
+ new
OverrideSetKeyContext1AndSetKeyContext2TwoInputStreamOperator();
+ RecordProcessorUtils.getRecordProcessor1(override).accept(new
StreamRecord<>("test"));
+ RecordProcessorUtils.getRecordProcessor2(override).accept(new
StreamRecord<>("test"));
+ assertThat(override.setKeyContextElement1Called).isTrue();
+ assertThat(override.setKeyContextElement2Called).isTrue();
+ }
+
+ private static class NoOverrideOperator extends
AbstractStreamOperator<String> {
+
+ boolean setCurrentKeyCalled = false;
+
+ NoOverrideOperator() throws NoSuchFieldException,
IllegalAccessException {
+ super();
+ // For case that "SetKeyContextElement" has not been overridden,
+ // we can determine whether the "SetKeyContextElement" is called
through
+ // "setCurrentKey". According to the implementation, we need to
make the
+ // "stateKeySelector1/stateKeySelector2" not null. Besides, we
override the
+ // "hasKeyContext1" and "hasKeyContext2" to avoid
"stateKeySelector1/stateKeySelector2"
+ // from affecting the return value
Review Comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]