tillrohrmann commented on a change in pull request #16432: URL: https://github.com/apache/flink/pull/16432#discussion_r666216609
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/util/NonSuccessFuturesTrack.java ########## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.coordination.util; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.locks.ReentrantLock; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * This trackers remember the CompletableFutures if they are not complete normally. It also allows Review comment: ```suggestion * This tracker remembers the CompletableFutures if they are completed exceptionally. It also allows ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/util/IncompleteFuturesTracker.java ########## @@ -108,4 +94,22 @@ void removeFromSet(CompletableFuture<?> future) { lock.unlock(); } } + + @VisibleForTesting + Collection<CompletableFuture<?>> getCurrentIncomplete() { Review comment: ```suggestion Collection<CompletableFuture<?>> getCurrentIncompleteAndReset() { ``` ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java ########## @@ -370,6 +373,82 @@ private void checkpointEventValueAtomicity( } } + @Test + public void testFailingCheckpointsIfSendingEventFailed() throws Exception { + CompletableFuture<Acknowledge> eventSendingResult = new CompletableFuture<>(); + final EventReceivingTasks tasks = + EventReceivingTasks.createForRunningTasksWithRpcResult(eventSendingResult); + final OperatorCoordinatorHolder holder = + createCoordinatorHolder(tasks, TestingOperatorCoordinator::new); + + // Send one event without finishing it. + getCoordinator(holder).getSubtaskGateway(0).sendEvent(new TestOperatorEvent(0)); + + // Triggers one checkpoint. + CompletableFuture<byte[]> checkpointResult = new CompletableFuture<>(); + holder.checkpointCoordinator(1, checkpointResult); + getCoordinator(holder).getLastTriggeredCheckpoint().complete(new byte[0]); + + // Fails the event sending. + eventSendingResult.completeExceptionally(new RuntimeException("Artificial")); + + assertTrue(eventSendingResult.isCompletedExceptionally()); + } + + @Test + public void testFailingCheckpointIfFailedEventNotProcessed() throws Exception { + final ReorderableManualExecutorService executor = new ReorderableManualExecutorService(); + final ComponentMainThreadExecutor mainThreadExecutor = + new ComponentMainThreadExecutorServiceAdapter( + (ScheduledExecutorService) executor, Thread.currentThread()); + + CompletableFuture<Acknowledge> eventSendingResult = new CompletableFuture<>(); + final EventReceivingTasks tasks = + EventReceivingTasks.createForRunningTasksWithRpcResult(eventSendingResult); + + final OperatorCoordinatorHolder holder = + createCoordinatorHolder(tasks, TestingOperatorCoordinator::new, mainThreadExecutor); + + // Send one event without finishing it. + getCoordinator(holder).getSubtaskGateway(0).sendEvent(new TestOperatorEvent(0)); + executor.triggerAll(); + + // Finish the event sending. This will insert one runnable that handle + // failed events to the executor. And we pending this runnable to + // simulates checkpoints triggered before the failure get processed. + executor.setPendingNewRunnables(true); + eventSendingResult.completeExceptionally(new RuntimeException("Artificial")); + executor.setPendingNewRunnables(false); + + // Triggers one checkpoint. + CompletableFuture<byte[]> checkpointResult = new CompletableFuture<>(); + holder.checkpointCoordinator(1, checkpointResult); + executor.triggerAll(); + getCoordinator(holder).getLastTriggeredCheckpoint().complete(new byte[0]); + executor.triggerAll(); + + assertTrue(checkpointResult.isCompletedExceptionally()); + } + + @Test + public void testCheckpointSuccessAfterFailedEventProcessed() throws Exception { + final EventReceivingTasks tasks = + EventReceivingTasks.createForRunningTasksFailingRpcs( + new RuntimeException("Artificial")); + final OperatorCoordinatorHolder holder = + createCoordinatorHolder(tasks, TestingOperatorCoordinator::new); + + // Send one event and finish it. This will also cause the failure get process immediately. Review comment: ```suggestion // Send one event and finish it. This will also process the failure immediately. ``` ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java ########## @@ -446,6 +525,27 @@ public OperatorCoordinator create(OperatorCoordinator.Context context) { return holder; } + private static class ReorderableManualExecutorService + extends ManuallyTriggeredScheduledExecutorService { + + private boolean pendingNewRunnables; + + private final Queue<Runnable> pendingRunnables = new ArrayDeque<>(); + + public void setPendingNewRunnables(boolean pendingNewRunnables) { Review comment: Maybe a better name would be `ignoreExecuteCalls(boolean)` or so. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/util/NonSuccessFuturesTrack.java ########## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.coordination.util; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.locks.ReentrantLock; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * This trackers remember the CompletableFutures if they are not complete normally. It also allows + * the callers to remove the failed ones after they have handled the future. + */ +public class NonSuccessFuturesTrack { Review comment: But I see why you have chosen `nonSuccess` because this either means incomplete or failed. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java ########## @@ -370,6 +373,82 @@ private void checkpointEventValueAtomicity( } } + @Test + public void testFailingCheckpointsIfSendingEventFailed() throws Exception { + CompletableFuture<Acknowledge> eventSendingResult = new CompletableFuture<>(); + final EventReceivingTasks tasks = + EventReceivingTasks.createForRunningTasksWithRpcResult(eventSendingResult); + final OperatorCoordinatorHolder holder = + createCoordinatorHolder(tasks, TestingOperatorCoordinator::new); + + // Send one event without finishing it. + getCoordinator(holder).getSubtaskGateway(0).sendEvent(new TestOperatorEvent(0)); + + // Triggers one checkpoint. + CompletableFuture<byte[]> checkpointResult = new CompletableFuture<>(); + holder.checkpointCoordinator(1, checkpointResult); + getCoordinator(holder).getLastTriggeredCheckpoint().complete(new byte[0]); + + // Fails the event sending. + eventSendingResult.completeExceptionally(new RuntimeException("Artificial")); + + assertTrue(eventSendingResult.isCompletedExceptionally()); + } Review comment: Should the last assert be `assertTrue(checkpointResult.isCompletedExceptionally());`? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/util/NonSuccessFuturesTrack.java ########## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.coordination.util; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.locks.ReentrantLock; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * This trackers remember the CompletableFutures if they are not complete normally. It also allows + * the callers to remove the failed ones after they have handled the future. + */ +public class NonSuccessFuturesTrack { Review comment: And we still have the `IncompleteFuturesTracker` in the code base. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/util/NonSuccessFuturesTrack.java ########## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.coordination.util; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.locks.ReentrantLock; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * This trackers remember the CompletableFutures if they are not complete normally. It also allows + * the callers to remove the failed ones after they have handled the future. Review comment: ```suggestion * the callers to remove the failed futures. ``` ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java ########## @@ -370,6 +373,82 @@ private void checkpointEventValueAtomicity( } } + @Test + public void testFailingCheckpointsIfSendingEventFailed() throws Exception { Review comment: ```suggestion public void testFailingCheckpointIfSendingEventFailed() throws Exception { ``` ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/util/NonSuccessFuturesTrackerTest.java ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.coordination.util; + +import org.junit.Test; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.not; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** Unit tests for the {@link IncompleteFuturesTracker}. */ +public class NonSuccessFuturesTrackerTest { Review comment: ```suggestion public class NonSuccessFuturesTrackerTest extends TestLogger { ``` ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java ########## @@ -446,6 +525,27 @@ public OperatorCoordinator create(OperatorCoordinator.Context context) { return holder; } + private static class ReorderableManualExecutorService + extends ManuallyTriggeredScheduledExecutorService { + + private boolean pendingNewRunnables; + + private final Queue<Runnable> pendingRunnables = new ArrayDeque<>(); + + public void setPendingNewRunnables(boolean pendingNewRunnables) { + this.pendingNewRunnables = pendingNewRunnables; + } + + @Override + public void execute(@Nonnull Runnable command) { + if (pendingNewRunnables) { + pendingRunnables.add(command); Review comment: Will the `Runnables` in `pendingRunnables` ever be executed or will they simply be ignored? ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java ########## @@ -370,6 +373,82 @@ private void checkpointEventValueAtomicity( } } + @Test + public void testFailingCheckpointsIfSendingEventFailed() throws Exception { + CompletableFuture<Acknowledge> eventSendingResult = new CompletableFuture<>(); + final EventReceivingTasks tasks = + EventReceivingTasks.createForRunningTasksWithRpcResult(eventSendingResult); + final OperatorCoordinatorHolder holder = + createCoordinatorHolder(tasks, TestingOperatorCoordinator::new); + + // Send one event without finishing it. + getCoordinator(holder).getSubtaskGateway(0).sendEvent(new TestOperatorEvent(0)); + + // Triggers one checkpoint. + CompletableFuture<byte[]> checkpointResult = new CompletableFuture<>(); + holder.checkpointCoordinator(1, checkpointResult); + getCoordinator(holder).getLastTriggeredCheckpoint().complete(new byte[0]); + + // Fails the event sending. + eventSendingResult.completeExceptionally(new RuntimeException("Artificial")); + + assertTrue(eventSendingResult.isCompletedExceptionally()); + } Review comment: What exactly is this test testing? To me it looks as if we are sending an event, then trigger a checkpoint, complete this checkpoint, then fail the `eventSendingResult` and then assert that `eventSendingResult` was indeed completed exceptionally. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java ########## @@ -370,6 +373,82 @@ private void checkpointEventValueAtomicity( } } + @Test + public void testFailingCheckpointsIfSendingEventFailed() throws Exception { + CompletableFuture<Acknowledge> eventSendingResult = new CompletableFuture<>(); + final EventReceivingTasks tasks = + EventReceivingTasks.createForRunningTasksWithRpcResult(eventSendingResult); + final OperatorCoordinatorHolder holder = + createCoordinatorHolder(tasks, TestingOperatorCoordinator::new); + + // Send one event without finishing it. + getCoordinator(holder).getSubtaskGateway(0).sendEvent(new TestOperatorEvent(0)); + + // Triggers one checkpoint. + CompletableFuture<byte[]> checkpointResult = new CompletableFuture<>(); + holder.checkpointCoordinator(1, checkpointResult); + getCoordinator(holder).getLastTriggeredCheckpoint().complete(new byte[0]); + + // Fails the event sending. + eventSendingResult.completeExceptionally(new RuntimeException("Artificial")); + + assertTrue(eventSendingResult.isCompletedExceptionally()); + } + + @Test + public void testFailingCheckpointIfFailedEventNotProcessed() throws Exception { + final ReorderableManualExecutorService executor = new ReorderableManualExecutorService(); + final ComponentMainThreadExecutor mainThreadExecutor = + new ComponentMainThreadExecutorServiceAdapter( + (ScheduledExecutorService) executor, Thread.currentThread()); + + CompletableFuture<Acknowledge> eventSendingResult = new CompletableFuture<>(); + final EventReceivingTasks tasks = + EventReceivingTasks.createForRunningTasksWithRpcResult(eventSendingResult); + + final OperatorCoordinatorHolder holder = + createCoordinatorHolder(tasks, TestingOperatorCoordinator::new, mainThreadExecutor); + + // Send one event without finishing it. + getCoordinator(holder).getSubtaskGateway(0).sendEvent(new TestOperatorEvent(0)); + executor.triggerAll(); + + // Finish the event sending. This will insert one runnable that handle + // failed events to the executor. And we pending this runnable to Review comment: What do you mean with "we pending this runnable"? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/util/NonSuccessFuturesTrack.java ########## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.coordination.util; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.locks.ReentrantLock; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * This trackers remember the CompletableFutures if they are not complete normally. It also allows + * the callers to remove the failed ones after they have handled the future. + */ +public class NonSuccessFuturesTrack { + + private final ReentrantLock lock = new ReentrantLock(); + + private final HashSet<CompletableFuture<?>> nonSuccessFutures = new HashSet<>(); + + public void trackFuture(CompletableFuture<?> future) { + if (future.isDone() && !future.isCompletedExceptionally()) { + return; + } + + lock.lock(); + try { + nonSuccessFutures.add(future); + } finally { + lock.unlock(); + } + + future.whenComplete( + (success, failure) -> { + if (failure == null) { + removeFromSet(future); + } + }); + } + + public Collection<CompletableFuture<?>> getNonSuccessFutures() { + lock.lock(); + try { + if (nonSuccessFutures.isEmpty()) { + return Collections.emptySet(); + } + + return new ArrayList<>(nonSuccessFutures); + } finally { + lock.unlock(); + } + } + + public void removeFailedFuture(CompletableFuture<?> future) { + checkState(future.isCompletedExceptionally(), "The future is not fail"); Review comment: ```suggestion checkArgument(future.isCompletedExceptionally(), "The future is not failed"); ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java ########## @@ -72,19 +79,28 @@ FutureUtils.assertNoException( result.handleAsync( (success, failure) -> { - if (failure != null && subtaskAccess.isStillRunning()) { - String msg = - String.format( - EVENT_LOSS_ERROR_MESSAGE, - evt, - subtaskAccess.subtaskName()); - subtaskAccess.triggerTaskFailover(new FlinkException(msg, failure)); + if (failure != null) { + if (subtaskAccess.isStillRunning()) { + String msg = + String.format( + EVENT_LOSS_ERROR_MESSAGE, + evt, + subtaskAccess.subtaskName()); + subtaskAccess.triggerTaskFailover( + new FlinkException(msg, failure)); + } + + nonSuccessFuturesTrack.removeFailedFuture(result); Review comment: I am wondering whether it wouldn't be simpler to change `result.handleAsync` to `result.whenAsync` and then to add the result of this operation to the `incompleteFuturesTracker`? That way we are sure that we will have handled the result before doing any other operations (e.g. failing/completing checkpoints). ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/util/NonSuccessFuturesTrack.java ########## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.coordination.util; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.locks.ReentrantLock; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * This trackers remember the CompletableFutures if they are not complete normally. It also allows + * the callers to remove the failed ones after they have handled the future. + */ +public class NonSuccessFuturesTrack { Review comment: ```suggestion public class NonSuccessFuturesTracker { ``` ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java ########## @@ -370,6 +373,82 @@ private void checkpointEventValueAtomicity( } } + @Test + public void testFailingCheckpointsIfSendingEventFailed() throws Exception { + CompletableFuture<Acknowledge> eventSendingResult = new CompletableFuture<>(); + final EventReceivingTasks tasks = + EventReceivingTasks.createForRunningTasksWithRpcResult(eventSendingResult); + final OperatorCoordinatorHolder holder = + createCoordinatorHolder(tasks, TestingOperatorCoordinator::new); + + // Send one event without finishing it. + getCoordinator(holder).getSubtaskGateway(0).sendEvent(new TestOperatorEvent(0)); + + // Triggers one checkpoint. + CompletableFuture<byte[]> checkpointResult = new CompletableFuture<>(); + holder.checkpointCoordinator(1, checkpointResult); + getCoordinator(holder).getLastTriggeredCheckpoint().complete(new byte[0]); + + // Fails the event sending. + eventSendingResult.completeExceptionally(new RuntimeException("Artificial")); + + assertTrue(eventSendingResult.isCompletedExceptionally()); + } + + @Test + public void testFailingCheckpointIfFailedEventNotProcessed() throws Exception { + final ReorderableManualExecutorService executor = new ReorderableManualExecutorService(); + final ComponentMainThreadExecutor mainThreadExecutor = + new ComponentMainThreadExecutorServiceAdapter( + (ScheduledExecutorService) executor, Thread.currentThread()); + + CompletableFuture<Acknowledge> eventSendingResult = new CompletableFuture<>(); + final EventReceivingTasks tasks = + EventReceivingTasks.createForRunningTasksWithRpcResult(eventSendingResult); + + final OperatorCoordinatorHolder holder = + createCoordinatorHolder(tasks, TestingOperatorCoordinator::new, mainThreadExecutor); + + // Send one event without finishing it. + getCoordinator(holder).getSubtaskGateway(0).sendEvent(new TestOperatorEvent(0)); + executor.triggerAll(); + + // Finish the event sending. This will insert one runnable that handle Review comment: ```suggestion // Finish the event sending. This will insert one runnable that handles ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/util/NonSuccessFuturesTrack.java ########## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.coordination.util; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.locks.ReentrantLock; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * This trackers remember the CompletableFutures if they are not complete normally. It also allows + * the callers to remove the failed ones after they have handled the future. + */ +public class NonSuccessFuturesTrack { Review comment: Maybe call this class `IncompleteFuturesTracker` since it tracks futures until they are either successfully completed or until the failed futures are manually removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
