pnowojski commented on a change in pull request #14908:
URL: https://github.com/apache/flink/pull/14908#discussion_r574515479
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
##########
@@ -86,11 +89,44 @@
*/
public class SourceStreamTaskTest {
+ @Test
+ public void testInputEndedBeforeStopWithSavepointConfirmed() throws
Exception {
+ CancelTestSource source =
+ new CancelTestSource(
+ STRING_TYPE_INFO.createSerializer(new
ExecutionConfig()), "src");
+ TestBoundedOneInputStreamOperator chainTail = new
TestBoundedOneInputStreamOperator("t");
+ StreamTaskMailboxTestHarness<String> harness =
+ new
StreamTaskMailboxTestHarnessBuilder<>(SourceStreamTask::new, STRING_TYPE_INFO)
+ .setupOperatorChain(
+ new OperatorID(),
+ new StreamSource<String,
CancelTestSource>(source))
+ .chain(
+ new OperatorID(),
+ chainTail,
+ STRING_TYPE_INFO.createSerializer(new
ExecutionConfig()))
+ .finish()
+ .build();
+ Future<Boolean> triggerFuture =
+ harness.streamTask.triggerCheckpointAsync(
+ new CheckpointMetaData(1, 1),
+ new CheckpointOptions(SYNC_SAVEPOINT, getDefault()),
+ false);
+ while (!triggerFuture.isDone()) {
+ harness.streamTask.runMailboxStep();
+ }
+ // stopping the input should be treated as "true" end of input
+ // because checkpoint completion notification will not be sent
Review comment:
```
// instead of completing stop with savepoint via `notifyCheckpointCompleted`
call
// we simulate that source has finished first. As a result we expect that
the endOfInput
// should have been issued
```
?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]