dawidwys commented on a change in pull request #15055:
URL: https://github.com/apache/flink/pull/15055#discussion_r656975368



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
##########
@@ -118,6 +121,8 @@ public static ByteBuffer toSerializedEvent(AbstractEvent 
event) throws IOExcepti
             buf.putInt(selector.getOutputSubtaskIndex());
             buf.flip();
             return buf;
+        } else if (eventClass == EndOfUserRecordsEvent.class) {

Review comment:
       nit: could we put it next to the `END_OF_CHANNEL_STATE_EVENT`? to keep 
all the trivial cases close to each other and at the same time keep related 
events together.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
##########
@@ -190,6 +190,13 @@ public ResultPartitionType getPartitionType() {
 
     // ------------------------------------------------------------------------
 
+    @Override
+    public CompletableFuture<Void> getAllRecordsProcessedFuture() throws 
IOException {
+        return CompletableFuture.completedFuture(null);
+    }
+
+    public void onSubpartitionAllRecordsProcessed(int subpartition) {}

Review comment:
       Could you add a javadoc here? At least pointing to 
`EndOfUserRecordsEvent`.
   
   I think that is enough.
   ```
       /**
        * @see org.apache.flink.runtime.io.network.api.EndOfUserRecordsEvent
        */
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
##########
@@ -190,6 +190,13 @@ public ResultPartitionType getPartitionType() {
 
     // ------------------------------------------------------------------------
 
+    @Override
+    public CompletableFuture<Void> getAllRecordsProcessedFuture() throws 
IOException {
+        return CompletableFuture.completedFuture(null);

Review comment:
       I'd rather throw `UnsupportedOperationException` by default. Otherwise, 
we are violating the contract of the method. We can not claim we got an 
acknowledgement for something we have not sent.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
##########
@@ -629,6 +632,46 @@ public void 
testBroadcastRecordWithRecordSpanningMultipleBuffers() throws Except
         }
     }
 
+    @Test
+    public void testWaitForAllRecordProcessed() throws IOException {
+        // Creates a result partition with 2 channels.
+        BufferWritingResultPartition bufferWritingResultPartition =
+                createResultPartition(ResultPartitionType.PIPELINED_BOUNDED);
+
+        // When acquiring the future for the first time, it would
+        // broadcast the EndOfUserRecordsEvent.

Review comment:
       This is an indication of an unclear contract of the method I mentioned 
above.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -695,6 +697,27 @@ protected void afterInvoke() throws Exception {
         // close all operators in a chain effect way
         operatorChain.closeOperators(actionExecutor);
 
+        // If checkpoints are enabled, waits for all the records get processed 
by the downstream
+        // tasks. During this process, this task could coordinate with its 
downstream tasks to
+        // continue perform checkpoints.
+        if (configuration.isCheckpointingEnabled()) {
+            LOG.debug("Waiting for all the records processed by the downstream 
tasks.");
+            CompletableFuture<Void> combineFuture =
+                    FutureUtils.waitForAll(
+                            Arrays.stream(getEnvironment().getAllWriters())
+                                    .map(
+                                            FunctionUtils.uncheckedFunction(
+                                                    ResultPartitionWriter
+                                                            
::getAllRecordsProcessedFuture))
+                                    .collect(Collectors.toList()));
+
+            MailboxExecutor mailboxExecutor =
+                    
mailboxProcessor.getMailboxExecutor(TaskMailbox.MIN_PRIORITY);
+            while (!combineFuture.isDone()) {

Review comment:
       I am not sure about this piece of code. We will end up busy looping 
waiting for the acknowledgement.
   
   I am wondering if we should introduce something like:
   
   ```
   
   while (!combineFuture.isDone()) {
      CompletableFuture<Boolean> mailReceived = mailboxExecutor.hasMail();
      FutureUtils.waitForEither(mailProcessed, combineFuture).get();
      if (mailReceived.isDone() && mailReceived.get()) {
          mailboxExecutor.yield();
      }
   }
   ```
   
   @pnowojski WDYT?

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
##########
@@ -852,49 +854,74 @@ public void testLatencyMarker() throws Exception {
 
     @Test
     public void testRpcTriggerCheckpointWithoutSourceChain() throws Exception {
-        AtomicReference<Future<?>> lastCheckpointTriggerFuture = new 
AtomicReference<>();
+        ResultPartition[] partitionWriters = new ResultPartition[2];
+        try {
+            for (int i = 0; i < partitionWriters.length; ++i) {
+                partitionWriters[i] =
+                        
PartitionTestUtils.createPartition(ResultPartitionType.PIPELINED_BOUNDED);
+                partitionWriters[i].setup();
+            }
 
-        try (StreamTaskMailboxTestHarness<String> testHarness =
-                new StreamTaskMailboxTestHarnessBuilder<>(
-                                env ->
-                                        new 
HoldingOnAfterInvokeMultipleInputStreamTask(
-                                                env, 
lastCheckpointTriggerFuture),
-                                BasicTypeInfo.STRING_TYPE_INFO)
-                        .addInput(BasicTypeInfo.STRING_TYPE_INFO)
-                        .addInput(BasicTypeInfo.INT_TYPE_INFO)
-                        .addInput(BasicTypeInfo.DOUBLE_TYPE_INFO)
-                        .modifyStreamConfig(config -> 
config.setCheckpointingEnabled(true))
-                        .setupOperatorChain(new 
MapToStringMultipleInputOperatorFactory(3))
-                        
.finishForSingletonOperatorChain(StringSerializer.INSTANCE)
-                        .build()) {
+            try (StreamTaskMailboxTestHarness<String> testHarness =
+                    new StreamTaskMailboxTestHarnessBuilder<>(
+                                    MultipleInputStreamTask::new, 
BasicTypeInfo.STRING_TYPE_INFO)
+                            .addInput(BasicTypeInfo.STRING_TYPE_INFO)
+                            .addInput(BasicTypeInfo.INT_TYPE_INFO)
+                            .addInput(BasicTypeInfo.DOUBLE_TYPE_INFO)
+                            .modifyStreamConfig(config -> 
config.setCheckpointingEnabled(true))
+                            .setupOperatorChain(new 
MapToStringMultipleInputOperatorFactory(3))
+                            .setNumberOfNonChainedOutputs(1 + 
partitionWriters.length)
+                            
.finishForSingletonOperatorChain(StringSerializer.INSTANCE)
+                            .addAdditionalOutput(partitionWriters)
+                            .build()) {
 
-            testHarness
-                    .getStreamTask()
-                    .getCheckpointCoordinator()
-                    .setEnableCheckpointAfterTasksFinished(true);
-
-            // Tests triggering checkpoint when all the inputs are alive.
-            Future<Boolean> checkpointFuture = triggerCheckpoint(testHarness, 
2);
-            processMailTillCheckpointSuccess(testHarness, checkpointFuture);
-            assertEquals(2, 
testHarness.getTaskStateManager().getReportedCheckpointId());
-
-            // Tests trigger checkpoint after some inputs have received 
EndOfPartition
-            testHarness.processEvent(EndOfPartitionEvent.INSTANCE, 0, 0);
-            checkpointFuture = triggerCheckpoint(testHarness, 4);
-            processMailTillCheckpointSuccess(testHarness, checkpointFuture);
-            assertEquals(4, 
testHarness.getTaskStateManager().getReportedCheckpointId());
-
-            // Tests trigger checkpoint after all the inputs have received 
EndOfPartition.
-            testHarness.processEvent(EndOfPartitionEvent.INSTANCE, 1, 0);
-            testHarness.processEvent(EndOfPartitionEvent.INSTANCE, 2, 0);
-            checkpointFuture = triggerCheckpoint(testHarness, 6);
-            lastCheckpointTriggerFuture.set(checkpointFuture);
-
-            // The checkpoint 6 would be triggered successfully.
-            // TODO: Would also check the checkpoint succeed after we also 
waiting
-            // for the asynchronous step to finish on finish.
-            testHarness.finishProcessing();
-            assertTrue(checkpointFuture.isDone());
+                testHarness
+                        .getStreamTask()
+                        .getCheckpointCoordinator()
+                        .setEnableCheckpointAfterTasksFinished(true);
+
+                // Tests triggering checkpoint when all the inputs are alive.
+                Future<Boolean> checkpointFuture = 
triggerCheckpoint(testHarness, 2);
+                processMailTillCheckpointSuccess(testHarness, 
checkpointFuture);
+                assertEquals(2, 
testHarness.getTaskStateManager().getReportedCheckpointId());
+
+                // Tests triggering checkpoint after some inputs have received 
EndOfPartition.
+                testHarness.processEvent(EndOfPartitionEvent.INSTANCE, 0, 0);
+                checkpointFuture = triggerCheckpoint(testHarness, 4);
+                processMailTillCheckpointSuccess(testHarness, 
checkpointFuture);
+                assertEquals(4, 
testHarness.getTaskStateManager().getReportedCheckpointId());
+
+                // Simulates the netty thread reports the downstream tasks 
have processed all the
+                // records.
+                new Thread(

Review comment:
       Could we replace spawning a separate thread with a mock 
`ResultPartition`? Having concurrency in tests is usually a straight path to 
instabilities.
   
   You could, e.g. reuse the `MockResultParitionWriter` and reimplement the 
methods of interest.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfUserRecordsEvent.java
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.event.RuntimeEvent;
+
+import java.io.IOException;
+
+/** This event marks a subpartition's user records as fully consumed. */

Review comment:
       Could we extend this javadoc slightly? Something along the lines:
   
   ```
   /**
    * This event indicates there will be no more data records in a 
subpartition. There still might be
    * other events, in particular {@link CheckpointBarrier CheckpointBarriers} 
traveling. The {@link
    * EndOfUserRecordsEvent} is acknowledged by the downstream task. That way 
we can safely assume the
    * downstream task has consumed all the produced records and therefore we 
can perform a final
    * checkpoint for the upstream task.
    *
    * @see <a href="https://cwiki.apache.org/confluence/x/mw-ZCQ";>FLIP-147</a>
    */
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
##########
@@ -629,6 +632,46 @@ public void 
testBroadcastRecordWithRecordSpanningMultipleBuffers() throws Except
         }
     }
 
+    @Test
+    public void testWaitForAllRecordProcessed() throws IOException {
+        // Creates a result partition with 2 channels.
+        BufferWritingResultPartition bufferWritingResultPartition =
+                createResultPartition(ResultPartitionType.PIPELINED_BOUNDED);
+
+        // When acquiring the future for the first time, it would
+        // broadcast the EndOfUserRecordsEvent.
+        CompletableFuture<Void> allRecordsProcessedFuture =
+                bufferWritingResultPartition.getAllRecordsProcessedFuture();
+        assertFalse(allRecordsProcessedFuture.isDone());
+        for (ResultSubpartition resultSubpartition : 
bufferWritingResultPartition.subpartitions) {
+            assertEquals(1, resultSubpartition.getTotalNumberOfBuffers());
+            Buffer nextBuffer = ((PipelinedSubpartition) 
resultSubpartition).pollBuffer().buffer();
+            assertFalse(nextBuffer.isBuffer());
+            assertEquals(
+                    EndOfUserRecordsEvent.INSTANCE,
+                    EventSerializer.fromBuffer(nextBuffer, 
getClass().getClassLoader()));
+        }
+
+        // When acquiring the future for the first time, it would

Review comment:
       second time? Still I think we should simply fix the contract ;)

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedResultPartition.java
##########
@@ -164,6 +192,35 @@ public void flush(int targetSubpartition) {
         flushSubpartition(targetSubpartition, false);
     }
 
+    @Override
+    public CompletableFuture<Void> getAllRecordsProcessedFuture() throws 
IOException {
+        synchronized (lock) {
+            if (allRecordsProcessedFuture == null) {
+                allRecordsProcessedFuture = new CompletableFuture<>();
+                broadcastEvent(EndOfUserRecordsEvent.INSTANCE, false);

Review comment:
       I'd rather not combine retrieving the future with broadcasting an event. 
That's an indirect additional contract to the method. 
   
   What do you think about:
   
   ```
       @GuardedBy("lock")
       private final CompletableFuture<Void> allRecordsProcessedFuture = new 
CompletableFuture<Void>();
   
    @Override
       public CompletableFuture<Void> getAllRecordsProcessedFuture() throws 
IOException {
           return allRecordsProcessedFuture;
       }
   
       @Override
       public void onSubpartitionAllRecordsProcessed(int subpartition) {
           synchronized (lock) {
               if (allRecordsProcessedSubpartitions[subpartition]) {
                   return;
               }
   
               allRecordsProcessedSubpartitions[subpartition] = true;
               numNotAllRecordsProcessedSubpartitions--;
   
               if (numNotAllRecordsProcessedSubpartitions == 0) {
                   allRecordsProcessedFuture.complete(null);
               }
           }
       }
   ```
   
   This way there is no indirect contract for calling the 
`getAllRecordsProcessedFuture`. Another approach would be to rather call it 
`notifyNoMoreRecords` or such, as that part of the function has more severe 
consequences.
   
   I am not sure about the purpose of the lazy initialization. However, if you 
think it is necessary, I'd do it in either of the two methods, whichever comes 
first.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
##########
@@ -123,6 +124,10 @@ else if (msgClazz == TaskEventRequest.class) {
 
                 outboundQueue.addCreditOrResumeConsumption(
                         request.receiverId, 
NetworkSequenceViewReader::resumeConsumption);
+            } else if (msgClazz == 
NettyMessage.AckAllUserRecordsProcessed.class) {

Review comment:
       nit: Add a static import for `AckAllUserRecordsProcessed`

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskChainedSourcesCheckpointingTest.java
##########
@@ -237,51 +241,76 @@ public void testOnlyOneSource() throws Exception {
 
     @Test
     public void testRpcTriggerCheckpointWithSourceChain() throws Exception {
-        AtomicReference<Future<?>> lastCheckpointTriggerFuture = new 
AtomicReference<>();
-
-        try (StreamTaskMailboxTestHarness<String> testHarness =
-                new StreamTaskMailboxTestHarnessBuilder<>(
-                                env ->
-                                        new MultipleInputStreamTaskTest
-                                                
.HoldingOnAfterInvokeMultipleInputStreamTask(
-                                                env, 
lastCheckpointTriggerFuture),
-                                BasicTypeInfo.STRING_TYPE_INFO)
-                        .modifyStreamConfig(config -> 
config.setCheckpointingEnabled(true))
-                        
.modifyExecutionConfig(ExecutionConfig::enableObjectReuse)
-                        .addInput(BasicTypeInfo.INT_TYPE_INFO)
-                        .addInput(BasicTypeInfo.STRING_TYPE_INFO)
-                        .addSourceInput(
-                                new SourceOperatorFactory<>(
-                                        new 
MultipleInputStreamTaskTest.LifeCycleTrackingMockSource(
-                                                Boundedness.BOUNDED, 1),
-                                        WatermarkStrategy.noWatermarks()))
-                        .addSourceInput(
-                                new SourceOperatorFactory<>(
-                                        new 
MultipleInputStreamTaskTest.LifeCycleTrackingMockSource(
-                                                Boundedness.BOUNDED, 1),
-                                        WatermarkStrategy.noWatermarks()))
-                        .setupOperatorChain(new 
MapToStringMultipleInputOperatorFactory(4))
-                        
.finishForSingletonOperatorChain(StringSerializer.INSTANCE)
-                        .build()) {
-
-            testHarness
-                    .getStreamTask()
-                    .getCheckpointCoordinator()
-                    .setEnableCheckpointAfterTasksFinished(true);
-
-            // TODO: Would add the test of part of channel finished after we 
are able to
-            // complement pending checkpoints when received 
EndOfPartitionEvent.
-
-            testHarness.processEvent(EndOfPartitionEvent.INSTANCE, 0, 0);
-            testHarness.processEvent(EndOfPartitionEvent.INSTANCE, 1, 0);
-            Future<Boolean> checkpointFuture = triggerCheckpoint(testHarness, 
4);
-            lastCheckpointTriggerFuture.set(checkpointFuture);
-
-            // The checkpoint 4 would be triggered successfully.
-            // TODO: Would also check the checkpoint succeed after we also 
waiting
-            // for the asynchronous step to finish on finish.
-            testHarness.finishProcessing();
-            assertTrue(checkpointFuture.isDone());
+        ResultPartition[] partitionWriters = new ResultPartition[2];

Review comment:
       Maybe a stupid question, but how are the `partitionWriters` connected to 
the `StreamTask` under the test?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to