pnowojski commented on code in PR #27638:
URL: https://github.com/apache/flink/pull/27638#discussion_r2831948212
##########
flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java:
##########
@@ -501,6 +501,57 @@ void
testStateReportingForSingleSplitWatermarkAlignmentAndIdleness() throws Exce
assertThat(operator.getSplitMetricGroup(split0.splitId()).isActive()).isTrue();
}
+ @Test
+ void testAlignmentCheckIsDeferredForIdleSplits() throws Exception {
+ final long idleTimeout = 100;
+ final MockSourceReader sourceReader =
+ new MockSourceReader(WaitingForSplits.DO_NOT_WAIT_FOR_SPLITS,
true, true);
+ final TestProcessingTimeService processingTimeService = new
TestProcessingTimeService();
+ final SourceOperator<Integer, MockSourceSplit> operator =
+ createAndOpenSourceOperatorWithIdleness(
+ sourceReader, processingTimeService, idleTimeout);
+
+ final MockSourceSplit split0 = new MockSourceSplit(0, 0, 10);
+ final int allowedWatermark4 = 4;
+ final int allowedWatermark7 = 7;
+ split0.addRecord(5);
+ split0.addRecord(6);
+ split0.addRecord(7);
+ split0.addRecord(8);
+ operator.handleOperatorEvent(
+ new AddSplitEvent<>(Arrays.asList(split0), new
MockSourceSplitSerializer()));
+ final CollectingDataOutput<Integer> actualOutput = new
CollectingDataOutput<>();
+
+ // Emit enough record to fill the sampler buffer
+ operator.emitNext(actualOutput);
+ operator.emitNext(actualOutput);
+ operator.emitNext(actualOutput);
+ sampleAllWatermarks(processingTimeService);
+
+ // Transition the split to idle state:
+ for (int i = 0; i < 10; i++) {
+ processingTimeService.advance(idleTimeout);
+ }
+
assertThat(operator.getSplitMetricGroup(split0.splitId()).isIdle()).isTrue();
+
+ // Alignment check fires but doesn't pause the idle split
+ operator.handleOperatorEvent(new
WatermarkAlignmentEvent(allowedWatermark4));
+
assertThat(operator.getSplitMetricGroup(split0.splitId()).isIdle()).isTrue();
Review Comment:
Can we assert that the split hasn't been paused?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]