tillrohrmann commented on a change in pull request #16432:
URL: https://github.com/apache/flink/pull/16432#discussion_r666216609



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/util/NonSuccessFuturesTrack.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination.util;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * This trackers remember the CompletableFutures if they are not complete 
normally. It also allows

Review comment:
       ```suggestion
    * This tracker remembers the CompletableFutures if they are completed 
exceptionally. It also allows
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/util/IncompleteFuturesTracker.java
##########
@@ -108,4 +94,22 @@ void removeFromSet(CompletableFuture<?> future) {
             lock.unlock();
         }
     }
+
+    @VisibleForTesting
+    Collection<CompletableFuture<?>> getCurrentIncomplete() {

Review comment:
       ```suggestion
       Collection<CompletableFuture<?>> getCurrentIncompleteAndReset() {
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
##########
@@ -370,6 +373,82 @@ private void checkpointEventValueAtomicity(
         }
     }
 
+    @Test
+    public void testFailingCheckpointsIfSendingEventFailed() throws Exception {
+        CompletableFuture<Acknowledge> eventSendingResult = new 
CompletableFuture<>();
+        final EventReceivingTasks tasks =
+                
EventReceivingTasks.createForRunningTasksWithRpcResult(eventSendingResult);
+        final OperatorCoordinatorHolder holder =
+                createCoordinatorHolder(tasks, 
TestingOperatorCoordinator::new);
+
+        // Send one event without finishing it.
+        getCoordinator(holder).getSubtaskGateway(0).sendEvent(new 
TestOperatorEvent(0));
+
+        // Triggers one checkpoint.
+        CompletableFuture<byte[]> checkpointResult = new CompletableFuture<>();
+        holder.checkpointCoordinator(1, checkpointResult);
+        getCoordinator(holder).getLastTriggeredCheckpoint().complete(new 
byte[0]);
+
+        // Fails the event sending.
+        eventSendingResult.completeExceptionally(new 
RuntimeException("Artificial"));
+
+        assertTrue(eventSendingResult.isCompletedExceptionally());
+    }
+
+    @Test
+    public void testFailingCheckpointIfFailedEventNotProcessed() throws 
Exception {
+        final ReorderableManualExecutorService executor = new 
ReorderableManualExecutorService();
+        final ComponentMainThreadExecutor mainThreadExecutor =
+                new ComponentMainThreadExecutorServiceAdapter(
+                        (ScheduledExecutorService) executor, 
Thread.currentThread());
+
+        CompletableFuture<Acknowledge> eventSendingResult = new 
CompletableFuture<>();
+        final EventReceivingTasks tasks =
+                
EventReceivingTasks.createForRunningTasksWithRpcResult(eventSendingResult);
+
+        final OperatorCoordinatorHolder holder =
+                createCoordinatorHolder(tasks, 
TestingOperatorCoordinator::new, mainThreadExecutor);
+
+        // Send one event without finishing it.
+        getCoordinator(holder).getSubtaskGateway(0).sendEvent(new 
TestOperatorEvent(0));
+        executor.triggerAll();
+
+        // Finish the event sending. This will insert one runnable that handle
+        // failed events to the executor. And we pending this runnable to
+        // simulates checkpoints triggered before the failure get processed.
+        executor.setPendingNewRunnables(true);
+        eventSendingResult.completeExceptionally(new 
RuntimeException("Artificial"));
+        executor.setPendingNewRunnables(false);
+
+        // Triggers one checkpoint.
+        CompletableFuture<byte[]> checkpointResult = new CompletableFuture<>();
+        holder.checkpointCoordinator(1, checkpointResult);
+        executor.triggerAll();
+        getCoordinator(holder).getLastTriggeredCheckpoint().complete(new 
byte[0]);
+        executor.triggerAll();
+
+        assertTrue(checkpointResult.isCompletedExceptionally());
+    }
+
+    @Test
+    public void testCheckpointSuccessAfterFailedEventProcessed() throws 
Exception {
+        final EventReceivingTasks tasks =
+                EventReceivingTasks.createForRunningTasksFailingRpcs(
+                        new RuntimeException("Artificial"));
+        final OperatorCoordinatorHolder holder =
+                createCoordinatorHolder(tasks, 
TestingOperatorCoordinator::new);
+
+        // Send one event and finish it. This will also cause the failure get 
process immediately.

Review comment:
       ```suggestion
           // Send one event and finish it. This will also process the failure 
immediately.
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
##########
@@ -446,6 +525,27 @@ public OperatorCoordinator 
create(OperatorCoordinator.Context context) {
         return holder;
     }
 
+    private static class ReorderableManualExecutorService
+            extends ManuallyTriggeredScheduledExecutorService {
+
+        private boolean pendingNewRunnables;
+
+        private final Queue<Runnable> pendingRunnables = new ArrayDeque<>();
+
+        public void setPendingNewRunnables(boolean pendingNewRunnables) {

Review comment:
       Maybe a better name would be `ignoreExecuteCalls(boolean)` or so.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/util/NonSuccessFuturesTrack.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination.util;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * This trackers remember the CompletableFutures if they are not complete 
normally. It also allows
+ * the callers to remove the failed ones after they have handled the future.
+ */
+public class NonSuccessFuturesTrack {

Review comment:
       But I see why you have chosen `nonSuccess` because this either means 
incomplete or failed.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
##########
@@ -370,6 +373,82 @@ private void checkpointEventValueAtomicity(
         }
     }
 
+    @Test
+    public void testFailingCheckpointsIfSendingEventFailed() throws Exception {
+        CompletableFuture<Acknowledge> eventSendingResult = new 
CompletableFuture<>();
+        final EventReceivingTasks tasks =
+                
EventReceivingTasks.createForRunningTasksWithRpcResult(eventSendingResult);
+        final OperatorCoordinatorHolder holder =
+                createCoordinatorHolder(tasks, 
TestingOperatorCoordinator::new);
+
+        // Send one event without finishing it.
+        getCoordinator(holder).getSubtaskGateway(0).sendEvent(new 
TestOperatorEvent(0));
+
+        // Triggers one checkpoint.
+        CompletableFuture<byte[]> checkpointResult = new CompletableFuture<>();
+        holder.checkpointCoordinator(1, checkpointResult);
+        getCoordinator(holder).getLastTriggeredCheckpoint().complete(new 
byte[0]);
+
+        // Fails the event sending.
+        eventSendingResult.completeExceptionally(new 
RuntimeException("Artificial"));
+
+        assertTrue(eventSendingResult.isCompletedExceptionally());
+    }

Review comment:
       Should the last assert be 
`assertTrue(checkpointResult.isCompletedExceptionally());`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/util/NonSuccessFuturesTrack.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination.util;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * This trackers remember the CompletableFutures if they are not complete 
normally. It also allows
+ * the callers to remove the failed ones after they have handled the future.
+ */
+public class NonSuccessFuturesTrack {

Review comment:
       And we still have the `IncompleteFuturesTracker` in the code base.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/util/NonSuccessFuturesTrack.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination.util;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * This trackers remember the CompletableFutures if they are not complete 
normally. It also allows
+ * the callers to remove the failed ones after they have handled the future.

Review comment:
       ```suggestion
    * the callers to remove the failed futures.
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
##########
@@ -370,6 +373,82 @@ private void checkpointEventValueAtomicity(
         }
     }
 
+    @Test
+    public void testFailingCheckpointsIfSendingEventFailed() throws Exception {

Review comment:
       ```suggestion
       public void testFailingCheckpointIfSendingEventFailed() throws Exception 
{
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/util/NonSuccessFuturesTrackerTest.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination.util;
+
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/** Unit tests for the {@link IncompleteFuturesTracker}. */
+public class NonSuccessFuturesTrackerTest {

Review comment:
       ```suggestion
   public class NonSuccessFuturesTrackerTest extends TestLogger {
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
##########
@@ -446,6 +525,27 @@ public OperatorCoordinator 
create(OperatorCoordinator.Context context) {
         return holder;
     }
 
+    private static class ReorderableManualExecutorService
+            extends ManuallyTriggeredScheduledExecutorService {
+
+        private boolean pendingNewRunnables;
+
+        private final Queue<Runnable> pendingRunnables = new ArrayDeque<>();
+
+        public void setPendingNewRunnables(boolean pendingNewRunnables) {
+            this.pendingNewRunnables = pendingNewRunnables;
+        }
+
+        @Override
+        public void execute(@Nonnull Runnable command) {
+            if (pendingNewRunnables) {
+                pendingRunnables.add(command);

Review comment:
       Will the `Runnables` in `pendingRunnables` ever be executed or will they 
simply be ignored?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
##########
@@ -370,6 +373,82 @@ private void checkpointEventValueAtomicity(
         }
     }
 
+    @Test
+    public void testFailingCheckpointsIfSendingEventFailed() throws Exception {
+        CompletableFuture<Acknowledge> eventSendingResult = new 
CompletableFuture<>();
+        final EventReceivingTasks tasks =
+                
EventReceivingTasks.createForRunningTasksWithRpcResult(eventSendingResult);
+        final OperatorCoordinatorHolder holder =
+                createCoordinatorHolder(tasks, 
TestingOperatorCoordinator::new);
+
+        // Send one event without finishing it.
+        getCoordinator(holder).getSubtaskGateway(0).sendEvent(new 
TestOperatorEvent(0));
+
+        // Triggers one checkpoint.
+        CompletableFuture<byte[]> checkpointResult = new CompletableFuture<>();
+        holder.checkpointCoordinator(1, checkpointResult);
+        getCoordinator(holder).getLastTriggeredCheckpoint().complete(new 
byte[0]);
+
+        // Fails the event sending.
+        eventSendingResult.completeExceptionally(new 
RuntimeException("Artificial"));
+
+        assertTrue(eventSendingResult.isCompletedExceptionally());
+    }

Review comment:
       What exactly is this test testing? To me it looks as if we are sending 
an event, then trigger a checkpoint, complete this checkpoint, then fail the 
`eventSendingResult` and then assert that `eventSendingResult` was indeed 
completed exceptionally.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
##########
@@ -370,6 +373,82 @@ private void checkpointEventValueAtomicity(
         }
     }
 
+    @Test
+    public void testFailingCheckpointsIfSendingEventFailed() throws Exception {
+        CompletableFuture<Acknowledge> eventSendingResult = new 
CompletableFuture<>();
+        final EventReceivingTasks tasks =
+                
EventReceivingTasks.createForRunningTasksWithRpcResult(eventSendingResult);
+        final OperatorCoordinatorHolder holder =
+                createCoordinatorHolder(tasks, 
TestingOperatorCoordinator::new);
+
+        // Send one event without finishing it.
+        getCoordinator(holder).getSubtaskGateway(0).sendEvent(new 
TestOperatorEvent(0));
+
+        // Triggers one checkpoint.
+        CompletableFuture<byte[]> checkpointResult = new CompletableFuture<>();
+        holder.checkpointCoordinator(1, checkpointResult);
+        getCoordinator(holder).getLastTriggeredCheckpoint().complete(new 
byte[0]);
+
+        // Fails the event sending.
+        eventSendingResult.completeExceptionally(new 
RuntimeException("Artificial"));
+
+        assertTrue(eventSendingResult.isCompletedExceptionally());
+    }
+
+    @Test
+    public void testFailingCheckpointIfFailedEventNotProcessed() throws 
Exception {
+        final ReorderableManualExecutorService executor = new 
ReorderableManualExecutorService();
+        final ComponentMainThreadExecutor mainThreadExecutor =
+                new ComponentMainThreadExecutorServiceAdapter(
+                        (ScheduledExecutorService) executor, 
Thread.currentThread());
+
+        CompletableFuture<Acknowledge> eventSendingResult = new 
CompletableFuture<>();
+        final EventReceivingTasks tasks =
+                
EventReceivingTasks.createForRunningTasksWithRpcResult(eventSendingResult);
+
+        final OperatorCoordinatorHolder holder =
+                createCoordinatorHolder(tasks, 
TestingOperatorCoordinator::new, mainThreadExecutor);
+
+        // Send one event without finishing it.
+        getCoordinator(holder).getSubtaskGateway(0).sendEvent(new 
TestOperatorEvent(0));
+        executor.triggerAll();
+
+        // Finish the event sending. This will insert one runnable that handle
+        // failed events to the executor. And we pending this runnable to

Review comment:
       What do you mean with "we pending this runnable"?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/util/NonSuccessFuturesTrack.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination.util;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * This trackers remember the CompletableFutures if they are not complete 
normally. It also allows
+ * the callers to remove the failed ones after they have handled the future.
+ */
+public class NonSuccessFuturesTrack {
+
+    private final ReentrantLock lock = new ReentrantLock();
+
+    private final HashSet<CompletableFuture<?>> nonSuccessFutures = new 
HashSet<>();
+
+    public void trackFuture(CompletableFuture<?> future) {
+        if (future.isDone() && !future.isCompletedExceptionally()) {
+            return;
+        }
+
+        lock.lock();
+        try {
+            nonSuccessFutures.add(future);
+        } finally {
+            lock.unlock();
+        }
+
+        future.whenComplete(
+                (success, failure) -> {
+                    if (failure == null) {
+                        removeFromSet(future);
+                    }
+                });
+    }
+
+    public Collection<CompletableFuture<?>> getNonSuccessFutures() {
+        lock.lock();
+        try {
+            if (nonSuccessFutures.isEmpty()) {
+                return Collections.emptySet();
+            }
+
+            return new ArrayList<>(nonSuccessFutures);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public void removeFailedFuture(CompletableFuture<?> future) {
+        checkState(future.isCompletedExceptionally(), "The future is not 
fail");

Review comment:
       ```suggestion
           checkArgument(future.isCompletedExceptionally(), "The future is not 
failed");
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java
##########
@@ -72,19 +79,28 @@
         FutureUtils.assertNoException(
                 result.handleAsync(
                         (success, failure) -> {
-                            if (failure != null && 
subtaskAccess.isStillRunning()) {
-                                String msg =
-                                        String.format(
-                                                EVENT_LOSS_ERROR_MESSAGE,
-                                                evt,
-                                                subtaskAccess.subtaskName());
-                                subtaskAccess.triggerTaskFailover(new 
FlinkException(msg, failure));
+                            if (failure != null) {
+                                if (subtaskAccess.isStillRunning()) {
+                                    String msg =
+                                            String.format(
+                                                    EVENT_LOSS_ERROR_MESSAGE,
+                                                    evt,
+                                                    
subtaskAccess.subtaskName());
+                                    subtaskAccess.triggerTaskFailover(
+                                            new FlinkException(msg, failure));
+                                }
+
+                                
nonSuccessFuturesTrack.removeFailedFuture(result);

Review comment:
       I am wondering whether it wouldn't be simpler to change 
`result.handleAsync` to `result.whenAsync` and then to add the result of this 
operation to the `incompleteFuturesTracker`? That way we are sure that we will 
have handled the result before doing any other operations (e.g. 
failing/completing checkpoints).

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/util/NonSuccessFuturesTrack.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination.util;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * This trackers remember the CompletableFutures if they are not complete 
normally. It also allows
+ * the callers to remove the failed ones after they have handled the future.
+ */
+public class NonSuccessFuturesTrack {

Review comment:
       ```suggestion
   public class NonSuccessFuturesTracker {
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
##########
@@ -370,6 +373,82 @@ private void checkpointEventValueAtomicity(
         }
     }
 
+    @Test
+    public void testFailingCheckpointsIfSendingEventFailed() throws Exception {
+        CompletableFuture<Acknowledge> eventSendingResult = new 
CompletableFuture<>();
+        final EventReceivingTasks tasks =
+                
EventReceivingTasks.createForRunningTasksWithRpcResult(eventSendingResult);
+        final OperatorCoordinatorHolder holder =
+                createCoordinatorHolder(tasks, 
TestingOperatorCoordinator::new);
+
+        // Send one event without finishing it.
+        getCoordinator(holder).getSubtaskGateway(0).sendEvent(new 
TestOperatorEvent(0));
+
+        // Triggers one checkpoint.
+        CompletableFuture<byte[]> checkpointResult = new CompletableFuture<>();
+        holder.checkpointCoordinator(1, checkpointResult);
+        getCoordinator(holder).getLastTriggeredCheckpoint().complete(new 
byte[0]);
+
+        // Fails the event sending.
+        eventSendingResult.completeExceptionally(new 
RuntimeException("Artificial"));
+
+        assertTrue(eventSendingResult.isCompletedExceptionally());
+    }
+
+    @Test
+    public void testFailingCheckpointIfFailedEventNotProcessed() throws 
Exception {
+        final ReorderableManualExecutorService executor = new 
ReorderableManualExecutorService();
+        final ComponentMainThreadExecutor mainThreadExecutor =
+                new ComponentMainThreadExecutorServiceAdapter(
+                        (ScheduledExecutorService) executor, 
Thread.currentThread());
+
+        CompletableFuture<Acknowledge> eventSendingResult = new 
CompletableFuture<>();
+        final EventReceivingTasks tasks =
+                
EventReceivingTasks.createForRunningTasksWithRpcResult(eventSendingResult);
+
+        final OperatorCoordinatorHolder holder =
+                createCoordinatorHolder(tasks, 
TestingOperatorCoordinator::new, mainThreadExecutor);
+
+        // Send one event without finishing it.
+        getCoordinator(holder).getSubtaskGateway(0).sendEvent(new 
TestOperatorEvent(0));
+        executor.triggerAll();
+
+        // Finish the event sending. This will insert one runnable that handle

Review comment:
       ```suggestion
           // Finish the event sending. This will insert one runnable that 
handles
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/util/NonSuccessFuturesTrack.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination.util;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * This trackers remember the CompletableFutures if they are not complete 
normally. It also allows
+ * the callers to remove the failed ones after they have handled the future.
+ */
+public class NonSuccessFuturesTrack {

Review comment:
       Maybe call this class `IncompleteFuturesTracker` since it tracks futures 
until they are either successfully completed or until the failed futures are 
manually removed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to