chucheng92 commented on code in PR #21464:
URL: https://github.com/apache/flink/pull/21464#discussion_r1057181722
##########
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java:
##########
@@ -216,6 +216,43 @@ public void testDefaultMethodDelegation() throws Exception
{
Mockito.verify(underlyingEnumeratorSpy).handleSourceEvent(0, se);
}
+ @Test
+ public void testInterceptNoMoreSplitEvent() {
+ context = new MockSplitEnumeratorContext<>(2);
+ source =
HybridSource.builder(MOCK_SOURCE).addSource(MOCK_SOURCE).build();
+
+ enumerator = (HybridSourceSplitEnumerator)
source.createEnumerator(context);
+ enumerator.start();
+ // mock enumerator assigns splits once all readers are registered
+ // At this time, hasNoMoreSplit check will call
context.signalIntermediateNoMoreSplits
+ registerReader(context, enumerator, SUBTASK0);
+ registerReader(context, enumerator, SUBTASK1);
+ enumerator.handleSourceEvent(SUBTASK0, new
SourceReaderFinishedEvent(-1));
+ enumerator.handleSourceEvent(SUBTASK1, new
SourceReaderFinishedEvent(-1));
+ assertThat(context.hasNoMoreSplits(0)).isFalse();
+ assertThat(context.hasNoMoreSplits(1)).isFalse();
+ splitFromSource0 =
+
context.getSplitsAssignmentSequence().get(0).assignment().get(SUBTASK0).get(0);
+
+ // task read finished, hasNoMoreSplit check will call
context.signalNoMoreSplits, this is
+ // final finished event
+ enumerator.handleSourceEvent(SUBTASK0, new
SourceReaderFinishedEvent(0));
+ enumerator.handleSourceEvent(SUBTASK1, new
SourceReaderFinishedEvent(0));
+ assertThat(context.hasNoMoreSplits(0)).isTrue();
+ assertThat(context.hasNoMoreSplits(1)).isTrue();
+
+ // test add splits back, then SUBTASK0 restore splitFromSource0 split
+ // reset splits assignment & previous subtaskHasNoMoreSplits flag.
+ context.getSplitsAssignmentSequence().clear();
+ Whitebox.setInternalState(context, "subtaskHasNoMoreSplits", new
boolean[] {false, false});
Review Comment:
@zhuzhurk thanks for your suggestion.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]