pnowojski commented on a change in pull request #15055:
URL: https://github.com/apache/flink/pull/15055#discussion_r661973155



##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
##########
@@ -1729,43 +1731,117 @@ protected void cancelTask() {
     }
 
     @Test
-    public void testTriggeringCheckpointWithFinishedChannels() throws 
Exception {
-        AtomicReference<Future<?>> lastCheckpointTriggerFuture = new 
AtomicReference<>();
+    public void testNotWaitingForAllRecordsProcessedIfCheckpointNotEnabled() 
throws Exception {
+        ResultPartitionWriter[] partitionWriters = new 
ResultPartitionWriter[2];
+        try {
+            for (int i = 0; i < partitionWriters.length; ++i) {
+                partitionWriters[i] =
+                        
PartitionTestUtils.createPartition(ResultPartitionType.PIPELINED_BOUNDED);
+                partitionWriters[i].setup();
+            }
 
-        try (StreamTaskMailboxTestHarness<String> testHarness =
-                new StreamTaskMailboxTestHarnessBuilder<>(
-                                env ->
-                                        new HoldingOnAfterInvokeStreamTask(
-                                                env, 
lastCheckpointTriggerFuture),
-                                BasicTypeInfo.STRING_TYPE_INFO)
-                        .addInput(BasicTypeInfo.STRING_TYPE_INFO, 3)
-                        .setupOutputForSingletonOperatorChain(new 
EmptyOperator())
-                        .build()) {
-            // Tests triggering checkpoint when all the inputs are alive.
-            Future<Boolean> checkpointFuture = triggerCheckpoint(testHarness, 
2);
-            processMailTillCheckpointSucceeds(testHarness, checkpointFuture);
-            assertEquals(2, 
testHarness.getTaskStateManager().getReportedCheckpointId());
+            try (StreamTaskMailboxTestHarness<String> testHarness =
+                    new StreamTaskMailboxTestHarnessBuilder<>(
+                                    OneInputStreamTask::new, STRING_TYPE_INFO)
+                            .modifyStreamConfig(config -> 
config.setCheckpointingEnabled(false))
+                            .addInput(STRING_TYPE_INFO)
+                            .setupOperatorChain(new EmptyOperator())
+                            .setNumberOfNonChainedOutputs(1 + 
partitionWriters.length)
+                            
.finishForSingletonOperatorChain(StringSerializer.INSTANCE)
+                            .addAdditionalOutput(partitionWriters)

Review comment:
       Can we avoid having separate method `setNumberOfNonChainedOutputs()` 
additional to `addAdditionalOutput()`?  Can not we have only 
`addAdditionalOutput()`?
   
   Maybe we could move `addAdditionalOutput()` to the `StreamConfigChainer`? (I 
see this might be problematic because `StreamConfigChainer` doesn't know the 
type of the `OWNER` so it wouldn't be able to pass additional outputs to the 
owner.
   
   Or maybe we can enforce order of setting `addAdditionalOutput()` before 
creating `StreamConfigChainer` in 
`StreamTaskMailboxTestHarnessBuilder#setupOperatorChain(org.apache.flink.runtime.jobgraph.OperatorID,
 org.apache.flink.streaming.api.operators.StreamOperatorFactory<?>)` and move 
`setNumberOfNonChainedOutputs` to the `StreamConfigChainer`'s constructor?  And 
we could add `checkState` in `addAdditionalOutput()` that it can only be called 
before  `setupOperatorChain()`? This would avoid potential problems that  
`setNumberOfNonChainedOutputs` is out of sync with respect to 
`addAdditionalOutput`.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierTracker.java
##########
@@ -212,14 +233,57 @@ public void 
processCancellationBarrier(CancelCheckpointMarker cancelBarrier)
     }
 
     @Override
-    public void processEndOfPartition() throws IOException {
-        while (!pendingCheckpoints.isEmpty()) {
-            CheckpointBarrierCount barrierCount = 
pendingCheckpoints.removeFirst();
-            if (barrierCount.markAborted()) {
-                notifyAbort(
-                        barrierCount.checkpointId(),
-                        new CheckpointException(
-                                
CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM));
+    public void processEndOfPartition(InputChannelInfo channelInfo) throws 
IOException {

Review comment:
       nit: isn't this method too long?  Shouldn't it be split?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
##########
@@ -205,6 +205,12 @@ public void resumeConsumption() {
         throw new UnsupportedOperationException("RecoveredInputChannel should 
never be blocked.");
     }
 
+    @Override
+    public void acknowledgeAllRecordsProcessed() throws IOException {
+        throw new UnsupportedOperationException(
+                "RecoveredInputChannel should not need acknowledge all records 
processed.");
+    }
+

Review comment:
       Is this the right thing to do? Or will it make unaligned checkpoints 
unusable with final checkpoint? What will happen if `EndOfUserRecordsEvent` has 
been emitted and sent downstream, but hasn't yet been processed, before 
unaligned checkpoint has completed? In that case we would recover 
`EndOfUserRecordsEvent` via `RecoveredInputChannel` and we would end up here, 
wouldn't we?
   
   Shouldn't we forward this message to 
`RemoteInputChannel`/`LocalInputChannel` after converting 
`RecoveredInputChannel` into one of those?

##########
File path: 
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java
##########
@@ -106,7 +106,8 @@ protected void processInput(MailboxDefaultAction.Controller 
controller) throws E
             mainOperator.processElement(streamRecord);
         } else {
             mainOperator.endInput();
-            controller.allActionsCompleted();
+            controller.suspendDefaultAction();

Review comment:
       is it necessary to suspend the default action? Or is it just a 
precaution?  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to