gaoyunhaii commented on a change in pull request #15055:
URL: https://github.com/apache/flink/pull/15055#discussion_r657809142



##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
##########
@@ -852,49 +854,74 @@ public void testLatencyMarker() throws Exception {
 
     @Test
     public void testRpcTriggerCheckpointWithoutSourceChain() throws Exception {
-        AtomicReference<Future<?>> lastCheckpointTriggerFuture = new 
AtomicReference<>();
+        ResultPartition[] partitionWriters = new ResultPartition[2];
+        try {
+            for (int i = 0; i < partitionWriters.length; ++i) {
+                partitionWriters[i] =
+                        
PartitionTestUtils.createPartition(ResultPartitionType.PIPELINED_BOUNDED);
+                partitionWriters[i].setup();
+            }
 
-        try (StreamTaskMailboxTestHarness<String> testHarness =
-                new StreamTaskMailboxTestHarnessBuilder<>(
-                                env ->
-                                        new 
HoldingOnAfterInvokeMultipleInputStreamTask(
-                                                env, 
lastCheckpointTriggerFuture),
-                                BasicTypeInfo.STRING_TYPE_INFO)
-                        .addInput(BasicTypeInfo.STRING_TYPE_INFO)
-                        .addInput(BasicTypeInfo.INT_TYPE_INFO)
-                        .addInput(BasicTypeInfo.DOUBLE_TYPE_INFO)
-                        .modifyStreamConfig(config -> 
config.setCheckpointingEnabled(true))
-                        .setupOperatorChain(new 
MapToStringMultipleInputOperatorFactory(3))
-                        
.finishForSingletonOperatorChain(StringSerializer.INSTANCE)
-                        .build()) {
+            try (StreamTaskMailboxTestHarness<String> testHarness =
+                    new StreamTaskMailboxTestHarnessBuilder<>(
+                                    MultipleInputStreamTask::new, 
BasicTypeInfo.STRING_TYPE_INFO)
+                            .addInput(BasicTypeInfo.STRING_TYPE_INFO)
+                            .addInput(BasicTypeInfo.INT_TYPE_INFO)
+                            .addInput(BasicTypeInfo.DOUBLE_TYPE_INFO)
+                            .modifyStreamConfig(config -> 
config.setCheckpointingEnabled(true))
+                            .setupOperatorChain(new 
MapToStringMultipleInputOperatorFactory(3))
+                            .setNumberOfNonChainedOutputs(1 + 
partitionWriters.length)
+                            
.finishForSingletonOperatorChain(StringSerializer.INSTANCE)
+                            .addAdditionalOutput(partitionWriters)
+                            .build()) {
 
-            testHarness
-                    .getStreamTask()
-                    .getCheckpointCoordinator()
-                    .setEnableCheckpointAfterTasksFinished(true);
-
-            // Tests triggering checkpoint when all the inputs are alive.
-            Future<Boolean> checkpointFuture = triggerCheckpoint(testHarness, 
2);
-            processMailTillCheckpointSuccess(testHarness, checkpointFuture);
-            assertEquals(2, 
testHarness.getTaskStateManager().getReportedCheckpointId());
-
-            // Tests trigger checkpoint after some inputs have received 
EndOfPartition
-            testHarness.processEvent(EndOfPartitionEvent.INSTANCE, 0, 0);
-            checkpointFuture = triggerCheckpoint(testHarness, 4);
-            processMailTillCheckpointSuccess(testHarness, checkpointFuture);
-            assertEquals(4, 
testHarness.getTaskStateManager().getReportedCheckpointId());
-
-            // Tests trigger checkpoint after all the inputs have received 
EndOfPartition.
-            testHarness.processEvent(EndOfPartitionEvent.INSTANCE, 1, 0);
-            testHarness.processEvent(EndOfPartitionEvent.INSTANCE, 2, 0);
-            checkpointFuture = triggerCheckpoint(testHarness, 6);
-            lastCheckpointTriggerFuture.set(checkpointFuture);
-
-            // The checkpoint 6 would be triggered successfully.
-            // TODO: Would also check the checkpoint succeed after we also 
waiting
-            // for the asynchronous step to finish on finish.
-            testHarness.finishProcessing();
-            assertTrue(checkpointFuture.isDone());
+                testHarness
+                        .getStreamTask()
+                        .getCheckpointCoordinator()
+                        .setEnableCheckpointAfterTasksFinished(true);
+
+                // Tests triggering checkpoint when all the inputs are alive.
+                Future<Boolean> checkpointFuture = 
triggerCheckpoint(testHarness, 2);
+                processMailTillCheckpointSuccess(testHarness, 
checkpointFuture);
+                assertEquals(2, 
testHarness.getTaskStateManager().getReportedCheckpointId());
+
+                // Tests triggering checkpoint after some inputs have received 
EndOfPartition.
+                testHarness.processEvent(EndOfPartitionEvent.INSTANCE, 0, 0);
+                checkpointFuture = triggerCheckpoint(testHarness, 4);
+                processMailTillCheckpointSuccess(testHarness, 
checkpointFuture);
+                assertEquals(4, 
testHarness.getTaskStateManager().getReportedCheckpointId());
+
+                // Simulates the netty thread reports the downstream tasks 
have processed all the
+                // records.
+                new Thread(

Review comment:
       I modified the tests to that once the `Future<Boolean>` representing the 
checkpoint trigger is done, the `ResultPartition` is notified that all user 
records are processed, thus we would not need another thread now~




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to