This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 387bc38e4ff3b6015e12a6826a750d749f21e72f
Author: Jiangjie (Becket) Qin <[email protected]>
AuthorDate: Sat Oct 31 23:08:07 2020 +0800

    [hotfix][runtime/operator] Make RecreateOnResetOperatorCoordinator fully 
asynchronous.
---
 .../RecreateOnResetOperatorCoordinator.java        | 268 ++++++++++++++++++---
 .../source/coordinator/SourceCoordinator.java      |  16 +-
 .../RecreateOnResetOperatorCoordinatorTest.java    | 177 +++++++++++++-
 .../coordination/TestingOperatorCoordinator.java   |  31 ++-
 .../source/coordinator/SourceCoordinatorTest.java  |   6 +-
 5 files changed, 451 insertions(+), 47 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
index 03861b1..8ee69ea 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
@@ -21,30 +21,46 @@ package org.apache.flink.runtime.operators.coordination;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static 
org.apache.flink.util.ComponentClosingUtils.closeAsyncWithTimeout;
 
 /**
  * A class that will recreate a new {@link OperatorCoordinator} instance when
  * reset to checkpoint.
  */
 public class RecreateOnResetOperatorCoordinator implements OperatorCoordinator 
{
-
+       private static final Logger LOG = 
LoggerFactory.getLogger(RecreateOnResetOperatorCoordinator.class);
+       private static final long CLOSING_TIMEOUT_MS = 60000L;
        private final Provider provider;
-       private QuiesceableContext quiesceableContext;
-       private OperatorCoordinator coordinator;
-
-       private boolean started;
+       private final long closingTimeoutMs;
+       private final OperatorCoordinator.Context context;
+       private DeferrableCoordinator coordinator;
+       private volatile boolean started;
+       private volatile boolean closed;
 
        private RecreateOnResetOperatorCoordinator(
-                       QuiesceableContext context,
-                       Provider provider) throws Exception {
-               this.quiesceableContext = context;
+                       OperatorCoordinator.Context context,
+                       Provider provider,
+                       long closingTimeoutMs) throws Exception {
+               this.context = context;
                this.provider = provider;
-               this.coordinator = provider.getCoordinator(context);
+               this.coordinator = new 
DeferrableCoordinator(context.getOperatorId());
+               this.coordinator.createNewInternalCoordinator(context, 
provider);
+               this.coordinator.processPendingCalls();
+               this.closingTimeoutMs = closingTimeoutMs;
                this.started = false;
+               this.closed = false;
        }
 
        @Override
@@ -55,55 +71,80 @@ public class RecreateOnResetOperatorCoordinator implements 
OperatorCoordinator {
 
        @Override
        public void close() throws Exception {
-               coordinator.close();
+               closed = true;
+               coordinator.closeAsync(closingTimeoutMs);
        }
 
        @Override
        public void handleEventFromOperator(int subtask, OperatorEvent event) 
throws Exception {
-               coordinator.handleEventFromOperator(subtask, event);
+               coordinator.applyCall(
+                               "handleEventFromOperator",
+                               c -> c.handleEventFromOperator(subtask, event));
        }
 
        @Override
        public void subtaskFailed(int subtask, @Nullable Throwable reason) {
-               coordinator.subtaskFailed(subtask, reason);
+               coordinator.applyCall(
+                               "subtaskFailed",
+                               c -> c.subtaskFailed(subtask, reason));
        }
 
        @Override
        public void checkpointCoordinator(long checkpointId, 
CompletableFuture<byte[]> resultFuture) throws Exception {
-               coordinator.checkpointCoordinator(checkpointId, resultFuture);
+               coordinator.applyCall(
+                               "checkpointCoordinator",
+                               c -> c.checkpointCoordinator(checkpointId, 
resultFuture));
        }
 
        @Override
        public void notifyCheckpointComplete(long checkpointId) {
-               coordinator.notifyCheckpointComplete(checkpointId);
+               coordinator.applyCall(
+                               "checkpointComplete",
+                               c -> c.notifyCheckpointComplete(checkpointId));
        }
 
        @Override
        public void resetToCheckpoint(byte[] checkpointData) throws Exception {
-               // Quiesce the context so the coordinator cannot interact with 
the job master anymore.
-               quiesceableContext.quiesce();
-               // Close the coordinator.
-               coordinator.close();
-               // Create a new coordinator and reset to the checkpoint.
-               quiesceableContext = new 
QuiesceableContext(quiesceableContext.getContext());
-               coordinator = provider.getCoordinator(quiesceableContext);
-               coordinator.resetToCheckpoint(checkpointData);
-               // Start the new coordinator if this coordinator has been 
started before reset to the checkpoint.
-               if (started) {
-                       coordinator.start();
-               }
+               // First bump up the coordinator epoch to fence out the active 
coordinator.
+               LOG.info("Resetting coordinator to checkpoint.");
+               // Replace the coordinator variable with a new 
DeferrableCoordinator instance.
+               // At this point the internal coordinator of the new 
coordinator has not been created.
+               // After this point all the subsequent calls will be made to 
the new coordinator.
+               final DeferrableCoordinator oldCoordinator = coordinator;
+               final DeferrableCoordinator newCoordinator =
+                       new DeferrableCoordinator(context.getOperatorId());
+               coordinator = newCoordinator;
+               // Close the old coordinator asynchronously in a separate 
closing thread.
+               // The future will be completed when the old coordinator closes.
+               CompletableFuture<Void> closingFuture = 
oldCoordinator.closeAsync(closingTimeoutMs);
+               // Create and
+               closingFuture.thenRun(() -> {
+                       if (!closed) {
+                               // The previous coordinator has closed. Create 
a new one.
+                               
newCoordinator.createNewInternalCoordinator(context, provider);
+                               newCoordinator.resetAndStart(checkpointData, 
started);
+                               newCoordinator.processPendingCalls();
+                       }
+               });
        }
 
        // ---------------------
 
        @VisibleForTesting
        public OperatorCoordinator getInternalCoordinator() {
-               return coordinator;
+               return coordinator.internalCoordinator;
        }
 
        @VisibleForTesting
        QuiesceableContext getQuiesceableContext() {
-               return quiesceableContext;
+               return coordinator.internalQuiesceableContext;
+       }
+
+       @VisibleForTesting
+       void waitForAllAsyncCallsFinish() throws Exception {
+               CompletableFuture<Void> future = new CompletableFuture<>();
+               coordinator.applyCall("waitForAllAsyncCallsFinish", c -> 
future.complete(null));
+               future.get();
        }
 
        // ---------------------
@@ -123,8 +164,12 @@ public class RecreateOnResetOperatorCoordinator implements 
OperatorCoordinator {
 
                @Override
                public OperatorCoordinator create(Context context) throws 
Exception {
-                       QuiesceableContext quiesceableContext = new 
QuiesceableContext(context);
-                       return new 
RecreateOnResetOperatorCoordinator(quiesceableContext, this);
+                       return create(context, CLOSING_TIMEOUT_MS);
+               }
+
+               @VisibleForTesting
+               protected OperatorCoordinator create(Context context, long 
closingTimeoutMs) throws Exception {
+                       return new RecreateOnResetOperatorCoordinator(context, 
this, closingTimeoutMs);
                }
 
                protected abstract OperatorCoordinator 
getCoordinator(OperatorCoordinator.Context context) throws Exception;
@@ -193,4 +238,167 @@ public class RecreateOnResetOperatorCoordinator 
implements OperatorCoordinator {
                        return context;
                }
        }
+
+       /**
+        * A class that helps realize the fully async {@link 
#resetToCheckpoint(byte[])} behavior.
+        * The class wraps an {@link OperatorCoordinator} instance. It is going 
to be accessed
+        * by two different thread: the scheduler thread and the closing thread 
created in
+        * {@link #closeAsync(long)}. A DeferrableCoordinator could be in three 
states:
+        *
+        * <ul>
+        *     <li><b>deferred:</b> The internal {@link OperatorCoordinator} 
has not been
+        *     created and all the method calls to the 
RecreateOnResetOperatorCoordinator are
+        *     added to a Queue.</li>
+        *     <li><b>catching up:</b> The internal {@link OperatorCoordinator} 
has been created
+        *     and is processing the queued up method calls. In this state, all 
the method calls
+        *     to the RecreateOnResetOperatorCoordinator are still going to be 
enqueued to
+        *     ensure the correct execution order.</li>
+        *     <li><b>caught up:</b> The internal {@link OperatorCoordinator} 
has finished
+        *     processing all the queued up method calls. From this point on, 
the method calls
+        *     to this coordinator will be executed in the caller thread 
directly instead of
+        *     being put into the queue.</li>
+        * </ul>
+        */
+       private static class DeferrableCoordinator {
+               private final OperatorID operatorId;
+               private final BlockingQueue<NamedCall> pendingCalls;
+               private QuiesceableContext internalQuiesceableContext;
+               private OperatorCoordinator internalCoordinator;
+               private boolean hasCaughtUp;
+               private boolean closed;
+               private volatile boolean failed;
+
+               private DeferrableCoordinator(OperatorID operatorId) {
+                       this.operatorId = operatorId;
+                       this.pendingCalls = new LinkedBlockingQueue<>();
+                       this.hasCaughtUp = false;
+                       this.closed = false;
+                       this.failed = false;
+               }
+
+               synchronized <T extends Exception> void applyCall(
+                               String name,
+                               ThrowingConsumer<OperatorCoordinator, T> call) 
throws T {
+                       synchronized (this) {
+                               if (hasCaughtUp) {
+                                       // The new coordinator has caught up.
+                                       call.accept(internalCoordinator);
+                               } else {
+                                       pendingCalls.add(new NamedCall(name, 
call));
+                               }
+                       }
+               }
+
+               synchronized void createNewInternalCoordinator(
+                               OperatorCoordinator.Context context,
+                               Provider provider) {
+                       if (closed) {
+                               return;
+                       }
+                       // Create a new internal coordinator and a new 
quiesceable context.
+                       // We assume that the coordinator creation is fast. 
Otherwise the creation
+                       // of the new internal coordinator may block the 
applyCall() method
+                       // which is invoked in the scheduler main thread.
+                       try {
+                               internalQuiesceableContext = new 
QuiesceableContext(context);
+                               internalCoordinator = 
provider.getCoordinator(internalQuiesceableContext);
+                       } catch (Exception e) {
+                               LOG.error("Failed to create new internal 
coordinator due to ", e);
+                               cleanAndFailJob(e);
+                       }
+               }
+
+               synchronized CompletableFuture<Void> closeAsync(long timeoutMs) 
{
+                       closed = true;
+                       if (internalCoordinator != null) {
+                               internalQuiesceableContext.quiesce();
+                               pendingCalls.clear();
+                               return closeAsyncWithTimeout(
+                                       "SourceCoordinator for " + operatorId,
+                                       (ThrowingRunnable<Exception>) 
internalCoordinator::close,
+                                       timeoutMs).exceptionally(e -> {
+                                               cleanAndFailJob(e);
+                                               return null;
+                                       });
+                       } else {
+                               return CompletableFuture.completedFuture(null);
+                       }
+               }
+
+               void processPendingCalls() {
+                       if (failed || closed || internalCoordinator == null) {
+                               return;
+                       }
+                       String name = "Unknown Call Name";
+                       try {
+                               while (!hasCaughtUp) {
+                                       while (!pendingCalls.isEmpty()) {
+                                               NamedCall namedCall = 
pendingCalls.poll();
+                                               if (namedCall != null) {
+                                                       name = namedCall.name;
+                                                       
namedCall.getConsumer().accept(internalCoordinator);
+                                               }
+                                       }
+                                       synchronized (this) {
+                                               // We need to check the pending 
calls queue again in case a new
+                                               // pending call is added after 
we process the last one and before
+                                               // we grab the lock.
+                                               if (pendingCalls.isEmpty()) {
+                                                       hasCaughtUp = true;
+                                               }
+                                       }
+                               }
+                       } catch (Throwable t) {
+                               LOG.error("Failed to process pending calls {} 
on coordinator.", name, t);
+                               cleanAndFailJob(t);
+                       }
+               }
+
+               void start() throws Exception {
+                       internalCoordinator.start();
+               }
+
+               void resetAndStart(byte[] checkpointData, boolean started) {
+                       if (failed || closed || internalCoordinator == null) {
+                               return;
+                       }
+                       try {
+                               
internalCoordinator.resetToCheckpoint(checkpointData);
+                               // Start the new coordinator if this 
coordinator has been started before reset to the checkpoint.
+                               if (started) {
+                                       internalCoordinator.start();
+                               }
+                       } catch (Exception e) {
+                               LOG.error("Failed to reset the coordinator to 
checkpoint and start.", e);
+                               cleanAndFailJob(e);
+                       }
+               }
+
+               private void cleanAndFailJob(Throwable t) {
+                       // Don't repeatedly fail the job.
+                       if (!failed) {
+                               failed = true;
+                               
internalQuiesceableContext.getContext().failJob(t);
+                               pendingCalls.clear();
+                       }
+               }
+       }
+
+       private static class NamedCall {
+               private final String name;
+               private final ThrowingConsumer<OperatorCoordinator, ?> consumer;
+
+               private NamedCall(String name, 
ThrowingConsumer<OperatorCoordinator, ?> consumer) {
+                       this.name = name;
+                       this.consumer = consumer;
+               }
+
+               public String getName() {
+                       return name;
+               }
+
+               public ThrowingConsumer<OperatorCoordinator, ?> getConsumer() {
+                       return consumer;
+               }
+       }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
index acbc93d..10baf02 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
@@ -48,7 +48,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 import static 
org.apache.flink.runtime.source.coordinator.SourceCoordinatorSerdeUtils.readAndVerifyCoordinatorSerdeVersion;
 import static 
org.apache.flink.runtime.source.coordinator.SourceCoordinatorSerdeUtils.readBytes;
@@ -105,14 +104,16 @@ public class SourceCoordinator<SplitT extends 
SourceSplit, EnumChkT> implements
        @Override
        public void start() throws Exception {
                LOG.info("Starting split enumerator for source {}.", 
operatorName);
-               enumerator.start();
+               // The start sequence is the first task in the coordinator 
executor.
+               // We rely on the single-threaded coordinator executor to 
guarantee
+               // the other methods are invoked after the enumerator has 
started.
+               coordinatorExecutor.execute(() -> enumerator.start());
                started = true;
        }
 
        @Override
        public void close() throws Exception {
                LOG.info("Closing SourceCoordinator for source {}.", 
operatorName);
-               boolean successfullyClosed = false;
                try {
                        if (started) {
                                context.close();
@@ -120,12 +121,9 @@ public class SourceCoordinator<SplitT extends SourceSplit, 
EnumChkT> implements
                        }
                } finally {
                        coordinatorExecutor.shutdownNow();
-                       // We do not expect this to actually block for long. At 
this point, there should be very few task running
-                       // in the executor, if any.
-                       successfullyClosed = 
coordinatorExecutor.awaitTermination(10, TimeUnit.SECONDS);
-               }
-               if (!successfullyClosed) {
-                       throw new TimeoutException("The source coordinator 
failed to close before timeout.");
+                       // We do not expect this to actually block for long. At 
this point, there should
+                       // be very few task running in the executor, if any.
+                       coordinatorExecutor.awaitTermination(Long.MAX_VALUE, 
TimeUnit.SECONDS);
                }
                LOG.info("Source coordinator for source {} closed.", 
operatorName);
        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinatorTest.java
index 7ff487f..c62b73b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinatorTest.java
@@ -18,11 +18,17 @@
 
 package org.apache.flink.runtime.operators.coordination;
 
+import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 
 import org.junit.Test;
 
+import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -70,7 +76,7 @@ public class RecreateOnResetOperatorCoordinatorTest {
 
        @Test
        public void testResetToCheckpoint() throws Exception {
-               TestingCoordinatorProvider provider = new 
TestingCoordinatorProvider();
+               TestingCoordinatorProvider provider = new 
TestingCoordinatorProvider(null);
                MockOperatorCoordinatorContext context = new 
MockOperatorCoordinatorContext(OPERATOR_ID, NUM_SUBTASKS);
                RecreateOnResetOperatorCoordinator coordinator = 
createCoordinator(provider, context);
 
@@ -80,6 +86,9 @@ public class RecreateOnResetOperatorCoordinatorTest {
                byte[] stateToRestore = new byte[0];
                coordinator.resetToCheckpoint(stateToRestore);
 
+               // Use the checkpoint to ensure all the previous method 
invocation has succeeded.
+               coordinator.waitForAllAsyncCallsFinish();
+
                assertTrue(contextBeforeReset.isQuiesced());
                
assertNull(internalCoordinatorBeforeReset.getLastRestoredCheckpointState());
 
@@ -87,6 +96,141 @@ public class RecreateOnResetOperatorCoordinatorTest {
                assertEquals(stateToRestore, 
internalCoordinatorAfterReset.getLastRestoredCheckpointState());
        }
 
+       @Test
+       public void testResetToCheckpointTimeout() throws Exception {
+               final long closingTimeoutMs = 1L;
+               // Let the user coordinator block on close.
+               TestingCoordinatorProvider provider = new 
TestingCoordinatorProvider(new CountDownLatch(1));
+               MockOperatorCoordinatorContext context = new 
MockOperatorCoordinatorContext(OPERATOR_ID, NUM_SUBTASKS);
+               RecreateOnResetOperatorCoordinator coordinator =
+                               (RecreateOnResetOperatorCoordinator) 
provider.create(context, closingTimeoutMs);
+
+               coordinator.resetToCheckpoint(new byte[0]);
+               CommonTestUtils.waitUtil(
+                       context::isJobFailed,
+                       Duration.ofSeconds(5),
+                       "The job should fail due to resetToCheckpoint() 
timeout.");
+       }
+
+       @Test
+       public void testMethodCallsOnLongResetToCheckpoint() throws Exception {
+               final long closingTimeoutMs = Long.MAX_VALUE;
+               final CountDownLatch blockOnCloseLatch = new CountDownLatch(1);
+               // Let the user coordinator block on close.
+               TestingCoordinatorProvider provider = new 
TestingCoordinatorProvider(blockOnCloseLatch);
+               MockOperatorCoordinatorContext context = new 
MockOperatorCoordinatorContext(OPERATOR_ID, NUM_SUBTASKS);
+               RecreateOnResetOperatorCoordinator coordinator =
+                               (RecreateOnResetOperatorCoordinator) 
provider.create(context, closingTimeoutMs);
+
+               // Set up the testing variables.
+               final byte[] restoredState = new byte[0];
+               final TestingEvent testingEvent = new TestingEvent();
+               final long completedCheckpointId = 1234L;
+
+               // Reset the coordinator which closes the current internal 
coordinator
+               // and then create a new one. The closing of the current 
internal
+               // coordinator will block until the blockOnCloseLatch is pulled.
+               coordinator.resetToCheckpoint(restoredState);
+
+               // The following method calls should be applied to the new 
internal
+               // coordinator asynchronously because the current coordinator 
has not
+               // been successfully closed yet.
+               coordinator.handleEventFromOperator(1, testingEvent);
+               coordinator.subtaskFailed(1, new Exception("Subtask Failure 
Exception."));
+               coordinator.notifyCheckpointComplete(completedCheckpointId);
+
+               // The new coordinator should not have been created because the 
resetToCheckpoint()
+               // should block on closing the current coordinator.
+               assertEquals(1, provider.getCreatedCoordinators().size());
+
+               // Now unblock the closing of the current coordinator.
+               blockOnCloseLatch.countDown();
+
+               // Take a checkpoint on the coordinator after reset.
+               CompletableFuture<byte[]> checkpointFuture = new 
CompletableFuture<>();
+               coordinator.checkpointCoordinator(5678L, checkpointFuture);
+               coordinator.waitForAllAsyncCallsFinish();
+
+               // Verify that the methods calls have been made against the new 
coordinator.
+               TestingOperatorCoordinator internalCoordinatorAfterReset = 
getInternalCoordinator(coordinator);
+               // The internal coordinator after reset should have triggered a 
new checkpoint.
+               assertEquals(checkpointFuture, 
internalCoordinatorAfterReset.getLastTriggeredCheckpoint());
+               // The internal coordinator after reset should be the second 
coordinator created by the provider.
+               assertEquals(provider.getCreatedCoordinators().get(1), 
internalCoordinatorAfterReset);
+               // The internal coordinator after reset should have been reset 
to the restored state.
+               assertEquals(restoredState, 
internalCoordinatorAfterReset.getLastRestoredCheckpointState());
+               // The internal coordinator after reset should have received 
the testing event.
+               assertEquals(testingEvent, 
internalCoordinatorAfterReset.getNextReceivedOperatorEvent());
+               // The internal coordinator after reset should have handled the 
failure of subtask 1.
+               assertEquals(Collections.singletonList(1), 
internalCoordinatorAfterReset.getFailedTasks());
+               // The internal coordinator after reset should have the 
completedCheckpointId.
+               assertEquals(completedCheckpointId, 
internalCoordinatorAfterReset.getLastCheckpointComplete());
+
+       }
+
+       @Test(timeout = 30000L)
+       public void testConsecutiveResetToCheckpoint() throws Exception {
+               final long closingTimeoutMs = Long.MAX_VALUE;
+               final int numResets = 1000;
+               // Let the user coordinator block on close.
+               TestingCoordinatorProvider provider = new 
TestingCoordinatorProvider();
+               MockOperatorCoordinatorContext context = new 
MockOperatorCoordinatorContext(OPERATOR_ID, NUM_SUBTASKS);
+               RecreateOnResetOperatorCoordinator coordinator =
+                               (RecreateOnResetOperatorCoordinator) 
provider.create(context, closingTimeoutMs);
+
+               // Loop to get some interleaved method invocations on multiple 
instances
+               // of active coordinators.
+               for (int i = 0; i < numResets; i++) {
+                       coordinator.handleEventFromOperator(1, new 
TestingEvent(i));
+                       coordinator.subtaskFailed(i, new Exception());
+                       CompletableFuture<byte[]> future = 
CompletableFuture.completedFuture(new byte[i]);
+                       coordinator.checkpointCoordinator(i, future);
+                       final int loop = i;
+                       future.thenRun(() -> 
coordinator.notifyCheckpointComplete(loop));
+                       // The reset bytes has a length of i+1 here because 
this will be reset to the
+                       // next internal coordinator.
+                       coordinator.resetToCheckpoint(new byte[i + 1]);
+               }
+
+               coordinator.waitForAllAsyncCallsFinish();
+
+               // Verify that the methods calls have been made against the 
coordinators.
+               for (TestingOperatorCoordinator internalCoordinator : 
provider.getCreatedCoordinators()) {
+                       // The indexOfCoordinator is set to 0 by default 
because:
+                       // 1. For the initial internal coordinator, its index 
is 0.
+                       // 2. For all the subsequent internal coordinators, 
there are two cases:
+                       //    a. they have processed at least one method call. 
In that case the coordinator
+                       //       must have been restored to the given state. So 
the indexOfCoordinator will
+                       //       be updated correctly.
+                       //    b. no method call was processed. In this case the 
indexOfCoordinator does not
+                       //       matter because all the fields will either be 
empty or null.
+                       int indexOfCoordinator = 0;
+                       byte[] lastRestoredState = 
internalCoordinator.getLastRestoredCheckpointState();
+                       if (lastRestoredState != null) {
+                               indexOfCoordinator = lastRestoredState.length;
+                       }
+                       TestingEvent testingEvent = (TestingEvent) 
internalCoordinator.getNextReceivedOperatorEvent();
+                       List<Integer> failedTasks = 
internalCoordinator.getFailedTasks();
+
+                       assertTrue(testingEvent == null || testingEvent.getId() 
== indexOfCoordinator);
+                       assertTrue(failedTasks.isEmpty() || (failedTasks.size() 
== 1 && failedTasks.get(0) == indexOfCoordinator));
+                       assertTrue(!internalCoordinator.hasCompleteCheckpoint() 
||
+                                                          
internalCoordinator.getLastCheckpointComplete() == indexOfCoordinator);
+                       
assertTrue(!internalCoordinator.hasTriggeredCheckpoint() ||
+                                                          
internalCoordinator.getLastTriggeredCheckpoint().get().length == 
indexOfCoordinator);
+               }
+               coordinator.close();
+               TestingOperatorCoordinator internalCoordinator = 
getInternalCoordinator(coordinator);
+               CommonTestUtils.waitUtil(
+                       internalCoordinator::isClosed,
+                       Duration.ofSeconds(5),
+                       "Timed out when waiting for the coordinator to close.");
+       }
+
+       public void testFailureInCreateCoordinator() {
+
+       }
+
        // ---------------
 
        private RecreateOnResetOperatorCoordinator createCoordinator(
@@ -103,18 +247,47 @@ public class RecreateOnResetOperatorCoordinatorTest {
 
        private static class TestingCoordinatorProvider extends 
RecreateOnResetOperatorCoordinator.Provider {
                private static final long serialVersionUID = 
4184184580789587013L;
+               private final CountDownLatch blockOnCloseLatch;
+               private final List<TestingOperatorCoordinator> 
createdCoordinators;
 
                public TestingCoordinatorProvider() {
+                       this(null);
+               }
+
+               public TestingCoordinatorProvider(
+                               CountDownLatch blockOnCloseLatch) {
                        super(OPERATOR_ID);
+                       this.blockOnCloseLatch = blockOnCloseLatch;
+                       this.createdCoordinators = new ArrayList<>();
                }
 
                @Override
                protected OperatorCoordinator 
getCoordinator(OperatorCoordinator.Context context) {
-                       return new TestingOperatorCoordinator(context);
+                       TestingOperatorCoordinator testingCoordinator =
+                                       new TestingOperatorCoordinator(context, 
blockOnCloseLatch);
+                       createdCoordinators.add(testingCoordinator);
+                       return testingCoordinator;
+               }
+
+               private List<TestingOperatorCoordinator> 
getCreatedCoordinators() {
+                       return createdCoordinators;
                }
        }
 
        private static class TestingEvent implements OperatorEvent {
                private static final long serialVersionUID = 
-3289352911927668275L;
+               private final int id;
+
+               private TestingEvent() {
+                       this(-1);
+               }
+
+               private TestingEvent(int id) {
+                       this.id = id;
+               }
+
+               private int getId() {
+                       return id;
+               }
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
index e4d8b04..5c5b91f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
@@ -24,9 +24,10 @@ import org.apache.flink.runtime.util.SerializableFunction;
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
-import java.util.Collection;
+import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 
 /**
@@ -38,6 +39,8 @@ class TestingOperatorCoordinator implements 
OperatorCoordinator {
 
        private final ArrayList<Integer> failedTasks = new ArrayList<>();
 
+       private final CountDownLatch blockOnCloseLatch;
+
        @Nullable
        private byte[] lastRestoredCheckpointState;
 
@@ -45,13 +48,23 @@ class TestingOperatorCoordinator implements 
OperatorCoordinator {
 
        private BlockingQueue<Long> lastCheckpointComplete;
 
+       private BlockingQueue<OperatorEvent> receivedOperatorEvents;
+
        private boolean started;
        private boolean closed;
 
        public TestingOperatorCoordinator(OperatorCoordinator.Context context) {
+               this(context, null);
+       }
+
+       public TestingOperatorCoordinator(
+                       OperatorCoordinator.Context context,
+                       CountDownLatch blockOnCloseLatch) {
                this.context = context;
                this.triggeredCheckpoints = new LinkedBlockingQueue<>();
                this.lastCheckpointComplete = new LinkedBlockingQueue<>();
+               this.receivedOperatorEvents = new LinkedBlockingQueue<>();
+               this.blockOnCloseLatch = blockOnCloseLatch;
        }
 
        // 
------------------------------------------------------------------------
@@ -62,12 +75,17 @@ class TestingOperatorCoordinator implements 
OperatorCoordinator {
        }
 
        @Override
-       public void close() {
+       public void close() throws InterruptedException {
                closed = true;
+               if (blockOnCloseLatch != null) {
+                       blockOnCloseLatch.await();
+               }
        }
 
        @Override
-       public void handleEventFromOperator(int subtask, OperatorEvent event) {}
+       public void handleEventFromOperator(int subtask, OperatorEvent event) {
+               receivedOperatorEvents.add(event);
+       }
 
        @Override
        public void subtaskFailed(int subtask, @Nullable Throwable reason) {
@@ -104,7 +122,7 @@ class TestingOperatorCoordinator implements 
OperatorCoordinator {
                return closed;
        }
 
-       public Collection<Integer> getFailedTasks() {
+       public List<Integer> getFailedTasks() {
                return failedTasks;
        }
 
@@ -125,6 +143,11 @@ class TestingOperatorCoordinator implements 
OperatorCoordinator {
                return lastCheckpointComplete.take();
        }
 
+       @Nullable
+       public OperatorEvent getNextReceivedOperatorEvent() {
+               return receivedOperatorEvents.poll();
+       }
+
        public boolean hasCompleteCheckpoint() throws InterruptedException {
                return !lastCheckpointComplete.isEmpty();
        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
index 6f6b5ba..0ddf971 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
@@ -74,11 +74,13 @@ public class SourceCoordinatorTest extends 
SourceCoordinatorTestBase {
                                                "only be reset to a checkpoint 
before it starts.", OPERATOR_NAME));
        }
 
-       @Test
+       @Test(timeout = 10000L)
        public void testStart() throws Exception {
                assertFalse(enumerator.started());
                sourceCoordinator.start();
-               assertTrue(enumerator.started());
+               while (!enumerator.started()) {
+                       Thread.sleep(1);
+               }
        }
 
        @Test

Reply via email to