rkhachatryan commented on a change in pull request #14908:
URL: https://github.com/apache/flink/pull/14908#discussion_r574523353
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
##########
@@ -86,11 +89,44 @@
*/
public class SourceStreamTaskTest {
+ @Test
+ public void testInputEndedBeforeStopWithSavepointConfirmed() throws
Exception {
+ CancelTestSource source =
+ new CancelTestSource(
+ STRING_TYPE_INFO.createSerializer(new
ExecutionConfig()), "src");
+ TestBoundedOneInputStreamOperator chainTail = new
TestBoundedOneInputStreamOperator("t");
+ StreamTaskMailboxTestHarness<String> harness =
+ new
StreamTaskMailboxTestHarnessBuilder<>(SourceStreamTask::new, STRING_TYPE_INFO)
+ .setupOperatorChain(
+ new OperatorID(),
+ new StreamSource<String,
CancelTestSource>(source))
+ .chain(
+ new OperatorID(),
+ chainTail,
+ STRING_TYPE_INFO.createSerializer(new
ExecutionConfig()))
+ .finish()
+ .build();
+ Future<Boolean> triggerFuture =
+ harness.streamTask.triggerCheckpointAsync(
+ new CheckpointMetaData(1, 1),
+ new CheckpointOptions(SYNC_SAVEPOINT, getDefault()),
+ false);
+ while (!triggerFuture.isDone()) {
+ harness.streamTask.runMailboxStep();
+ }
+ // stopping the input should be treated as "true" end of input
+ // because checkpoint completion notification will not be sent
Review comment:
I'll change it, thanks!
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]