gaoyunhaii commented on a change in pull request #15055:
URL: https://github.com/apache/flink/pull/15055#discussion_r657809142
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
##########
@@ -852,49 +854,74 @@ public void testLatencyMarker() throws Exception {
@Test
public void testRpcTriggerCheckpointWithoutSourceChain() throws Exception {
- AtomicReference<Future<?>> lastCheckpointTriggerFuture = new
AtomicReference<>();
+ ResultPartition[] partitionWriters = new ResultPartition[2];
+ try {
+ for (int i = 0; i < partitionWriters.length; ++i) {
+ partitionWriters[i] =
+
PartitionTestUtils.createPartition(ResultPartitionType.PIPELINED_BOUNDED);
+ partitionWriters[i].setup();
+ }
- try (StreamTaskMailboxTestHarness<String> testHarness =
- new StreamTaskMailboxTestHarnessBuilder<>(
- env ->
- new
HoldingOnAfterInvokeMultipleInputStreamTask(
- env,
lastCheckpointTriggerFuture),
- BasicTypeInfo.STRING_TYPE_INFO)
- .addInput(BasicTypeInfo.STRING_TYPE_INFO)
- .addInput(BasicTypeInfo.INT_TYPE_INFO)
- .addInput(BasicTypeInfo.DOUBLE_TYPE_INFO)
- .modifyStreamConfig(config ->
config.setCheckpointingEnabled(true))
- .setupOperatorChain(new
MapToStringMultipleInputOperatorFactory(3))
-
.finishForSingletonOperatorChain(StringSerializer.INSTANCE)
- .build()) {
+ try (StreamTaskMailboxTestHarness<String> testHarness =
+ new StreamTaskMailboxTestHarnessBuilder<>(
+ MultipleInputStreamTask::new,
BasicTypeInfo.STRING_TYPE_INFO)
+ .addInput(BasicTypeInfo.STRING_TYPE_INFO)
+ .addInput(BasicTypeInfo.INT_TYPE_INFO)
+ .addInput(BasicTypeInfo.DOUBLE_TYPE_INFO)
+ .modifyStreamConfig(config ->
config.setCheckpointingEnabled(true))
+ .setupOperatorChain(new
MapToStringMultipleInputOperatorFactory(3))
+ .setNumberOfNonChainedOutputs(1 +
partitionWriters.length)
+
.finishForSingletonOperatorChain(StringSerializer.INSTANCE)
+ .addAdditionalOutput(partitionWriters)
+ .build()) {
- testHarness
- .getStreamTask()
- .getCheckpointCoordinator()
- .setEnableCheckpointAfterTasksFinished(true);
-
- // Tests triggering checkpoint when all the inputs are alive.
- Future<Boolean> checkpointFuture = triggerCheckpoint(testHarness,
2);
- processMailTillCheckpointSuccess(testHarness, checkpointFuture);
- assertEquals(2,
testHarness.getTaskStateManager().getReportedCheckpointId());
-
- // Tests trigger checkpoint after some inputs have received
EndOfPartition
- testHarness.processEvent(EndOfPartitionEvent.INSTANCE, 0, 0);
- checkpointFuture = triggerCheckpoint(testHarness, 4);
- processMailTillCheckpointSuccess(testHarness, checkpointFuture);
- assertEquals(4,
testHarness.getTaskStateManager().getReportedCheckpointId());
-
- // Tests trigger checkpoint after all the inputs have received
EndOfPartition.
- testHarness.processEvent(EndOfPartitionEvent.INSTANCE, 1, 0);
- testHarness.processEvent(EndOfPartitionEvent.INSTANCE, 2, 0);
- checkpointFuture = triggerCheckpoint(testHarness, 6);
- lastCheckpointTriggerFuture.set(checkpointFuture);
-
- // The checkpoint 6 would be triggered successfully.
- // TODO: Would also check the checkpoint succeed after we also
waiting
- // for the asynchronous step to finish on finish.
- testHarness.finishProcessing();
- assertTrue(checkpointFuture.isDone());
+ testHarness
+ .getStreamTask()
+ .getCheckpointCoordinator()
+ .setEnableCheckpointAfterTasksFinished(true);
+
+ // Tests triggering checkpoint when all the inputs are alive.
+ Future<Boolean> checkpointFuture =
triggerCheckpoint(testHarness, 2);
+ processMailTillCheckpointSuccess(testHarness,
checkpointFuture);
+ assertEquals(2,
testHarness.getTaskStateManager().getReportedCheckpointId());
+
+ // Tests triggering checkpoint after some inputs have received
EndOfPartition.
+ testHarness.processEvent(EndOfPartitionEvent.INSTANCE, 0, 0);
+ checkpointFuture = triggerCheckpoint(testHarness, 4);
+ processMailTillCheckpointSuccess(testHarness,
checkpointFuture);
+ assertEquals(4,
testHarness.getTaskStateManager().getReportedCheckpointId());
+
+ // Simulates the netty thread reports the downstream tasks
have processed all the
+ // records.
+ new Thread(
Review comment:
I modified the tests to that once the `Future<Boolean>` representing the
checkpoint trigger is done, the `ResultPartition` is notified that all user
records are processed, thus we would not need another thread now~
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]