This is an automated email from the ASF dual-hosted git repository. jqin pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 387bc38e4ff3b6015e12a6826a750d749f21e72f Author: Jiangjie (Becket) Qin <[email protected]> AuthorDate: Sat Oct 31 23:08:07 2020 +0800 [hotfix][runtime/operator] Make RecreateOnResetOperatorCoordinator fully asynchronous. --- .../RecreateOnResetOperatorCoordinator.java | 268 ++++++++++++++++++--- .../source/coordinator/SourceCoordinator.java | 16 +- .../RecreateOnResetOperatorCoordinatorTest.java | 177 +++++++++++++- .../coordination/TestingOperatorCoordinator.java | 31 ++- .../source/coordinator/SourceCoordinatorTest.java | 6 +- 5 files changed, 451 insertions(+), 47 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java index 03861b1..8ee69ea 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java @@ -21,30 +21,46 @@ package org.apache.flink.runtime.operators.coordination; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.ThrowingRunnable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.LinkedBlockingQueue; + +import static org.apache.flink.util.ComponentClosingUtils.closeAsyncWithTimeout; /** * A class that will recreate a new {@link OperatorCoordinator} instance when * reset to checkpoint. */ public class RecreateOnResetOperatorCoordinator implements OperatorCoordinator { - + private static final Logger LOG = LoggerFactory.getLogger(RecreateOnResetOperatorCoordinator.class); + private static final long CLOSING_TIMEOUT_MS = 60000L; private final Provider provider; - private QuiesceableContext quiesceableContext; - private OperatorCoordinator coordinator; - - private boolean started; + private final long closingTimeoutMs; + private final OperatorCoordinator.Context context; + private DeferrableCoordinator coordinator; + private volatile boolean started; + private volatile boolean closed; private RecreateOnResetOperatorCoordinator( - QuiesceableContext context, - Provider provider) throws Exception { - this.quiesceableContext = context; + OperatorCoordinator.Context context, + Provider provider, + long closingTimeoutMs) throws Exception { + this.context = context; this.provider = provider; - this.coordinator = provider.getCoordinator(context); + this.coordinator = new DeferrableCoordinator(context.getOperatorId()); + this.coordinator.createNewInternalCoordinator(context, provider); + this.coordinator.processPendingCalls(); + this.closingTimeoutMs = closingTimeoutMs; this.started = false; + this.closed = false; } @Override @@ -55,55 +71,80 @@ public class RecreateOnResetOperatorCoordinator implements OperatorCoordinator { @Override public void close() throws Exception { - coordinator.close(); + closed = true; + coordinator.closeAsync(closingTimeoutMs); } @Override public void handleEventFromOperator(int subtask, OperatorEvent event) throws Exception { - coordinator.handleEventFromOperator(subtask, event); + coordinator.applyCall( + "handleEventFromOperator", + c -> c.handleEventFromOperator(subtask, event)); } @Override public void subtaskFailed(int subtask, @Nullable Throwable reason) { - coordinator.subtaskFailed(subtask, reason); + coordinator.applyCall( + "subtaskFailed", + c -> c.subtaskFailed(subtask, reason)); } @Override public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> resultFuture) throws Exception { - coordinator.checkpointCoordinator(checkpointId, resultFuture); + coordinator.applyCall( + "checkpointCoordinator", + c -> c.checkpointCoordinator(checkpointId, resultFuture)); } @Override public void notifyCheckpointComplete(long checkpointId) { - coordinator.notifyCheckpointComplete(checkpointId); + coordinator.applyCall( + "checkpointComplete", + c -> c.notifyCheckpointComplete(checkpointId)); } @Override public void resetToCheckpoint(byte[] checkpointData) throws Exception { - // Quiesce the context so the coordinator cannot interact with the job master anymore. - quiesceableContext.quiesce(); - // Close the coordinator. - coordinator.close(); - // Create a new coordinator and reset to the checkpoint. - quiesceableContext = new QuiesceableContext(quiesceableContext.getContext()); - coordinator = provider.getCoordinator(quiesceableContext); - coordinator.resetToCheckpoint(checkpointData); - // Start the new coordinator if this coordinator has been started before reset to the checkpoint. - if (started) { - coordinator.start(); - } + // First bump up the coordinator epoch to fence out the active coordinator. + LOG.info("Resetting coordinator to checkpoint."); + // Replace the coordinator variable with a new DeferrableCoordinator instance. + // At this point the internal coordinator of the new coordinator has not been created. + // After this point all the subsequent calls will be made to the new coordinator. + final DeferrableCoordinator oldCoordinator = coordinator; + final DeferrableCoordinator newCoordinator = + new DeferrableCoordinator(context.getOperatorId()); + coordinator = newCoordinator; + // Close the old coordinator asynchronously in a separate closing thread. + // The future will be completed when the old coordinator closes. + CompletableFuture<Void> closingFuture = oldCoordinator.closeAsync(closingTimeoutMs); + // Create and + closingFuture.thenRun(() -> { + if (!closed) { + // The previous coordinator has closed. Create a new one. + newCoordinator.createNewInternalCoordinator(context, provider); + newCoordinator.resetAndStart(checkpointData, started); + newCoordinator.processPendingCalls(); + } + }); } // --------------------- @VisibleForTesting public OperatorCoordinator getInternalCoordinator() { - return coordinator; + return coordinator.internalCoordinator; } @VisibleForTesting QuiesceableContext getQuiesceableContext() { - return quiesceableContext; + return coordinator.internalQuiesceableContext; + } + + @VisibleForTesting + void waitForAllAsyncCallsFinish() throws Exception { + CompletableFuture<Void> future = new CompletableFuture<>(); + coordinator.applyCall("waitForAllAsyncCallsFinish", c -> future.complete(null)); + future.get(); } // --------------------- @@ -123,8 +164,12 @@ public class RecreateOnResetOperatorCoordinator implements OperatorCoordinator { @Override public OperatorCoordinator create(Context context) throws Exception { - QuiesceableContext quiesceableContext = new QuiesceableContext(context); - return new RecreateOnResetOperatorCoordinator(quiesceableContext, this); + return create(context, CLOSING_TIMEOUT_MS); + } + + @VisibleForTesting + protected OperatorCoordinator create(Context context, long closingTimeoutMs) throws Exception { + return new RecreateOnResetOperatorCoordinator(context, this, closingTimeoutMs); } protected abstract OperatorCoordinator getCoordinator(OperatorCoordinator.Context context) throws Exception; @@ -193,4 +238,167 @@ public class RecreateOnResetOperatorCoordinator implements OperatorCoordinator { return context; } } + + /** + * A class that helps realize the fully async {@link #resetToCheckpoint(byte[])} behavior. + * The class wraps an {@link OperatorCoordinator} instance. It is going to be accessed + * by two different thread: the scheduler thread and the closing thread created in + * {@link #closeAsync(long)}. A DeferrableCoordinator could be in three states: + * + * <ul> + * <li><b>deferred:</b> The internal {@link OperatorCoordinator} has not been + * created and all the method calls to the RecreateOnResetOperatorCoordinator are + * added to a Queue.</li> + * <li><b>catching up:</b> The internal {@link OperatorCoordinator} has been created + * and is processing the queued up method calls. In this state, all the method calls + * to the RecreateOnResetOperatorCoordinator are still going to be enqueued to + * ensure the correct execution order.</li> + * <li><b>caught up:</b> The internal {@link OperatorCoordinator} has finished + * processing all the queued up method calls. From this point on, the method calls + * to this coordinator will be executed in the caller thread directly instead of + * being put into the queue.</li> + * </ul> + */ + private static class DeferrableCoordinator { + private final OperatorID operatorId; + private final BlockingQueue<NamedCall> pendingCalls; + private QuiesceableContext internalQuiesceableContext; + private OperatorCoordinator internalCoordinator; + private boolean hasCaughtUp; + private boolean closed; + private volatile boolean failed; + + private DeferrableCoordinator(OperatorID operatorId) { + this.operatorId = operatorId; + this.pendingCalls = new LinkedBlockingQueue<>(); + this.hasCaughtUp = false; + this.closed = false; + this.failed = false; + } + + synchronized <T extends Exception> void applyCall( + String name, + ThrowingConsumer<OperatorCoordinator, T> call) throws T { + synchronized (this) { + if (hasCaughtUp) { + // The new coordinator has caught up. + call.accept(internalCoordinator); + } else { + pendingCalls.add(new NamedCall(name, call)); + } + } + } + + synchronized void createNewInternalCoordinator( + OperatorCoordinator.Context context, + Provider provider) { + if (closed) { + return; + } + // Create a new internal coordinator and a new quiesceable context. + // We assume that the coordinator creation is fast. Otherwise the creation + // of the new internal coordinator may block the applyCall() method + // which is invoked in the scheduler main thread. + try { + internalQuiesceableContext = new QuiesceableContext(context); + internalCoordinator = provider.getCoordinator(internalQuiesceableContext); + } catch (Exception e) { + LOG.error("Failed to create new internal coordinator due to ", e); + cleanAndFailJob(e); + } + } + + synchronized CompletableFuture<Void> closeAsync(long timeoutMs) { + closed = true; + if (internalCoordinator != null) { + internalQuiesceableContext.quiesce(); + pendingCalls.clear(); + return closeAsyncWithTimeout( + "SourceCoordinator for " + operatorId, + (ThrowingRunnable<Exception>) internalCoordinator::close, + timeoutMs).exceptionally(e -> { + cleanAndFailJob(e); + return null; + }); + } else { + return CompletableFuture.completedFuture(null); + } + } + + void processPendingCalls() { + if (failed || closed || internalCoordinator == null) { + return; + } + String name = "Unknown Call Name"; + try { + while (!hasCaughtUp) { + while (!pendingCalls.isEmpty()) { + NamedCall namedCall = pendingCalls.poll(); + if (namedCall != null) { + name = namedCall.name; + namedCall.getConsumer().accept(internalCoordinator); + } + } + synchronized (this) { + // We need to check the pending calls queue again in case a new + // pending call is added after we process the last one and before + // we grab the lock. + if (pendingCalls.isEmpty()) { + hasCaughtUp = true; + } + } + } + } catch (Throwable t) { + LOG.error("Failed to process pending calls {} on coordinator.", name, t); + cleanAndFailJob(t); + } + } + + void start() throws Exception { + internalCoordinator.start(); + } + + void resetAndStart(byte[] checkpointData, boolean started) { + if (failed || closed || internalCoordinator == null) { + return; + } + try { + internalCoordinator.resetToCheckpoint(checkpointData); + // Start the new coordinator if this coordinator has been started before reset to the checkpoint. + if (started) { + internalCoordinator.start(); + } + } catch (Exception e) { + LOG.error("Failed to reset the coordinator to checkpoint and start.", e); + cleanAndFailJob(e); + } + } + + private void cleanAndFailJob(Throwable t) { + // Don't repeatedly fail the job. + if (!failed) { + failed = true; + internalQuiesceableContext.getContext().failJob(t); + pendingCalls.clear(); + } + } + } + + private static class NamedCall { + private final String name; + private final ThrowingConsumer<OperatorCoordinator, ?> consumer; + + private NamedCall(String name, ThrowingConsumer<OperatorCoordinator, ?> consumer) { + this.name = name; + this.consumer = consumer; + } + + public String getName() { + return name; + } + + public ThrowingConsumer<OperatorCoordinator, ?> getConsumer() { + return consumer; + } + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java index acbc93d..10baf02 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java @@ -48,7 +48,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import static org.apache.flink.runtime.source.coordinator.SourceCoordinatorSerdeUtils.readAndVerifyCoordinatorSerdeVersion; import static org.apache.flink.runtime.source.coordinator.SourceCoordinatorSerdeUtils.readBytes; @@ -105,14 +104,16 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> implements @Override public void start() throws Exception { LOG.info("Starting split enumerator for source {}.", operatorName); - enumerator.start(); + // The start sequence is the first task in the coordinator executor. + // We rely on the single-threaded coordinator executor to guarantee + // the other methods are invoked after the enumerator has started. + coordinatorExecutor.execute(() -> enumerator.start()); started = true; } @Override public void close() throws Exception { LOG.info("Closing SourceCoordinator for source {}.", operatorName); - boolean successfullyClosed = false; try { if (started) { context.close(); @@ -120,12 +121,9 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> implements } } finally { coordinatorExecutor.shutdownNow(); - // We do not expect this to actually block for long. At this point, there should be very few task running - // in the executor, if any. - successfullyClosed = coordinatorExecutor.awaitTermination(10, TimeUnit.SECONDS); - } - if (!successfullyClosed) { - throw new TimeoutException("The source coordinator failed to close before timeout."); + // We do not expect this to actually block for long. At this point, there should + // be very few task running in the executor, if any. + coordinatorExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); } LOG.info("Source coordinator for source {} closed.", operatorName); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinatorTest.java index 7ff487f..c62b73b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinatorTest.java @@ -18,11 +18,17 @@ package org.apache.flink.runtime.operators.coordination; +import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.runtime.jobgraph.OperatorID; import org.junit.Test; +import java.time.Duration; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -70,7 +76,7 @@ public class RecreateOnResetOperatorCoordinatorTest { @Test public void testResetToCheckpoint() throws Exception { - TestingCoordinatorProvider provider = new TestingCoordinatorProvider(); + TestingCoordinatorProvider provider = new TestingCoordinatorProvider(null); MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(OPERATOR_ID, NUM_SUBTASKS); RecreateOnResetOperatorCoordinator coordinator = createCoordinator(provider, context); @@ -80,6 +86,9 @@ public class RecreateOnResetOperatorCoordinatorTest { byte[] stateToRestore = new byte[0]; coordinator.resetToCheckpoint(stateToRestore); + // Use the checkpoint to ensure all the previous method invocation has succeeded. + coordinator.waitForAllAsyncCallsFinish(); + assertTrue(contextBeforeReset.isQuiesced()); assertNull(internalCoordinatorBeforeReset.getLastRestoredCheckpointState()); @@ -87,6 +96,141 @@ public class RecreateOnResetOperatorCoordinatorTest { assertEquals(stateToRestore, internalCoordinatorAfterReset.getLastRestoredCheckpointState()); } + @Test + public void testResetToCheckpointTimeout() throws Exception { + final long closingTimeoutMs = 1L; + // Let the user coordinator block on close. + TestingCoordinatorProvider provider = new TestingCoordinatorProvider(new CountDownLatch(1)); + MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(OPERATOR_ID, NUM_SUBTASKS); + RecreateOnResetOperatorCoordinator coordinator = + (RecreateOnResetOperatorCoordinator) provider.create(context, closingTimeoutMs); + + coordinator.resetToCheckpoint(new byte[0]); + CommonTestUtils.waitUtil( + context::isJobFailed, + Duration.ofSeconds(5), + "The job should fail due to resetToCheckpoint() timeout."); + } + + @Test + public void testMethodCallsOnLongResetToCheckpoint() throws Exception { + final long closingTimeoutMs = Long.MAX_VALUE; + final CountDownLatch blockOnCloseLatch = new CountDownLatch(1); + // Let the user coordinator block on close. + TestingCoordinatorProvider provider = new TestingCoordinatorProvider(blockOnCloseLatch); + MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(OPERATOR_ID, NUM_SUBTASKS); + RecreateOnResetOperatorCoordinator coordinator = + (RecreateOnResetOperatorCoordinator) provider.create(context, closingTimeoutMs); + + // Set up the testing variables. + final byte[] restoredState = new byte[0]; + final TestingEvent testingEvent = new TestingEvent(); + final long completedCheckpointId = 1234L; + + // Reset the coordinator which closes the current internal coordinator + // and then create a new one. The closing of the current internal + // coordinator will block until the blockOnCloseLatch is pulled. + coordinator.resetToCheckpoint(restoredState); + + // The following method calls should be applied to the new internal + // coordinator asynchronously because the current coordinator has not + // been successfully closed yet. + coordinator.handleEventFromOperator(1, testingEvent); + coordinator.subtaskFailed(1, new Exception("Subtask Failure Exception.")); + coordinator.notifyCheckpointComplete(completedCheckpointId); + + // The new coordinator should not have been created because the resetToCheckpoint() + // should block on closing the current coordinator. + assertEquals(1, provider.getCreatedCoordinators().size()); + + // Now unblock the closing of the current coordinator. + blockOnCloseLatch.countDown(); + + // Take a checkpoint on the coordinator after reset. + CompletableFuture<byte[]> checkpointFuture = new CompletableFuture<>(); + coordinator.checkpointCoordinator(5678L, checkpointFuture); + coordinator.waitForAllAsyncCallsFinish(); + + // Verify that the methods calls have been made against the new coordinator. + TestingOperatorCoordinator internalCoordinatorAfterReset = getInternalCoordinator(coordinator); + // The internal coordinator after reset should have triggered a new checkpoint. + assertEquals(checkpointFuture, internalCoordinatorAfterReset.getLastTriggeredCheckpoint()); + // The internal coordinator after reset should be the second coordinator created by the provider. + assertEquals(provider.getCreatedCoordinators().get(1), internalCoordinatorAfterReset); + // The internal coordinator after reset should have been reset to the restored state. + assertEquals(restoredState, internalCoordinatorAfterReset.getLastRestoredCheckpointState()); + // The internal coordinator after reset should have received the testing event. + assertEquals(testingEvent, internalCoordinatorAfterReset.getNextReceivedOperatorEvent()); + // The internal coordinator after reset should have handled the failure of subtask 1. + assertEquals(Collections.singletonList(1), internalCoordinatorAfterReset.getFailedTasks()); + // The internal coordinator after reset should have the completedCheckpointId. + assertEquals(completedCheckpointId, internalCoordinatorAfterReset.getLastCheckpointComplete()); + + } + + @Test(timeout = 30000L) + public void testConsecutiveResetToCheckpoint() throws Exception { + final long closingTimeoutMs = Long.MAX_VALUE; + final int numResets = 1000; + // Let the user coordinator block on close. + TestingCoordinatorProvider provider = new TestingCoordinatorProvider(); + MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(OPERATOR_ID, NUM_SUBTASKS); + RecreateOnResetOperatorCoordinator coordinator = + (RecreateOnResetOperatorCoordinator) provider.create(context, closingTimeoutMs); + + // Loop to get some interleaved method invocations on multiple instances + // of active coordinators. + for (int i = 0; i < numResets; i++) { + coordinator.handleEventFromOperator(1, new TestingEvent(i)); + coordinator.subtaskFailed(i, new Exception()); + CompletableFuture<byte[]> future = CompletableFuture.completedFuture(new byte[i]); + coordinator.checkpointCoordinator(i, future); + final int loop = i; + future.thenRun(() -> coordinator.notifyCheckpointComplete(loop)); + // The reset bytes has a length of i+1 here because this will be reset to the + // next internal coordinator. + coordinator.resetToCheckpoint(new byte[i + 1]); + } + + coordinator.waitForAllAsyncCallsFinish(); + + // Verify that the methods calls have been made against the coordinators. + for (TestingOperatorCoordinator internalCoordinator : provider.getCreatedCoordinators()) { + // The indexOfCoordinator is set to 0 by default because: + // 1. For the initial internal coordinator, its index is 0. + // 2. For all the subsequent internal coordinators, there are two cases: + // a. they have processed at least one method call. In that case the coordinator + // must have been restored to the given state. So the indexOfCoordinator will + // be updated correctly. + // b. no method call was processed. In this case the indexOfCoordinator does not + // matter because all the fields will either be empty or null. + int indexOfCoordinator = 0; + byte[] lastRestoredState = internalCoordinator.getLastRestoredCheckpointState(); + if (lastRestoredState != null) { + indexOfCoordinator = lastRestoredState.length; + } + TestingEvent testingEvent = (TestingEvent) internalCoordinator.getNextReceivedOperatorEvent(); + List<Integer> failedTasks = internalCoordinator.getFailedTasks(); + + assertTrue(testingEvent == null || testingEvent.getId() == indexOfCoordinator); + assertTrue(failedTasks.isEmpty() || (failedTasks.size() == 1 && failedTasks.get(0) == indexOfCoordinator)); + assertTrue(!internalCoordinator.hasCompleteCheckpoint() || + internalCoordinator.getLastCheckpointComplete() == indexOfCoordinator); + assertTrue(!internalCoordinator.hasTriggeredCheckpoint() || + internalCoordinator.getLastTriggeredCheckpoint().get().length == indexOfCoordinator); + } + coordinator.close(); + TestingOperatorCoordinator internalCoordinator = getInternalCoordinator(coordinator); + CommonTestUtils.waitUtil( + internalCoordinator::isClosed, + Duration.ofSeconds(5), + "Timed out when waiting for the coordinator to close."); + } + + public void testFailureInCreateCoordinator() { + + } + // --------------- private RecreateOnResetOperatorCoordinator createCoordinator( @@ -103,18 +247,47 @@ public class RecreateOnResetOperatorCoordinatorTest { private static class TestingCoordinatorProvider extends RecreateOnResetOperatorCoordinator.Provider { private static final long serialVersionUID = 4184184580789587013L; + private final CountDownLatch blockOnCloseLatch; + private final List<TestingOperatorCoordinator> createdCoordinators; public TestingCoordinatorProvider() { + this(null); + } + + public TestingCoordinatorProvider( + CountDownLatch blockOnCloseLatch) { super(OPERATOR_ID); + this.blockOnCloseLatch = blockOnCloseLatch; + this.createdCoordinators = new ArrayList<>(); } @Override protected OperatorCoordinator getCoordinator(OperatorCoordinator.Context context) { - return new TestingOperatorCoordinator(context); + TestingOperatorCoordinator testingCoordinator = + new TestingOperatorCoordinator(context, blockOnCloseLatch); + createdCoordinators.add(testingCoordinator); + return testingCoordinator; + } + + private List<TestingOperatorCoordinator> getCreatedCoordinators() { + return createdCoordinators; } } private static class TestingEvent implements OperatorEvent { private static final long serialVersionUID = -3289352911927668275L; + private final int id; + + private TestingEvent() { + this(-1); + } + + private TestingEvent(int id) { + this.id = id; + } + + private int getId() { + return id; + } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java index e4d8b04..5c5b91f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java @@ -24,9 +24,10 @@ import org.apache.flink.runtime.util.SerializableFunction; import javax.annotation.Nullable; import java.util.ArrayList; -import java.util.Collection; +import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; /** @@ -38,6 +39,8 @@ class TestingOperatorCoordinator implements OperatorCoordinator { private final ArrayList<Integer> failedTasks = new ArrayList<>(); + private final CountDownLatch blockOnCloseLatch; + @Nullable private byte[] lastRestoredCheckpointState; @@ -45,13 +48,23 @@ class TestingOperatorCoordinator implements OperatorCoordinator { private BlockingQueue<Long> lastCheckpointComplete; + private BlockingQueue<OperatorEvent> receivedOperatorEvents; + private boolean started; private boolean closed; public TestingOperatorCoordinator(OperatorCoordinator.Context context) { + this(context, null); + } + + public TestingOperatorCoordinator( + OperatorCoordinator.Context context, + CountDownLatch blockOnCloseLatch) { this.context = context; this.triggeredCheckpoints = new LinkedBlockingQueue<>(); this.lastCheckpointComplete = new LinkedBlockingQueue<>(); + this.receivedOperatorEvents = new LinkedBlockingQueue<>(); + this.blockOnCloseLatch = blockOnCloseLatch; } // ------------------------------------------------------------------------ @@ -62,12 +75,17 @@ class TestingOperatorCoordinator implements OperatorCoordinator { } @Override - public void close() { + public void close() throws InterruptedException { closed = true; + if (blockOnCloseLatch != null) { + blockOnCloseLatch.await(); + } } @Override - public void handleEventFromOperator(int subtask, OperatorEvent event) {} + public void handleEventFromOperator(int subtask, OperatorEvent event) { + receivedOperatorEvents.add(event); + } @Override public void subtaskFailed(int subtask, @Nullable Throwable reason) { @@ -104,7 +122,7 @@ class TestingOperatorCoordinator implements OperatorCoordinator { return closed; } - public Collection<Integer> getFailedTasks() { + public List<Integer> getFailedTasks() { return failedTasks; } @@ -125,6 +143,11 @@ class TestingOperatorCoordinator implements OperatorCoordinator { return lastCheckpointComplete.take(); } + @Nullable + public OperatorEvent getNextReceivedOperatorEvent() { + return receivedOperatorEvents.poll(); + } + public boolean hasCompleteCheckpoint() throws InterruptedException { return !lastCheckpointComplete.isEmpty(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java index 6f6b5ba..0ddf971 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java @@ -74,11 +74,13 @@ public class SourceCoordinatorTest extends SourceCoordinatorTestBase { "only be reset to a checkpoint before it starts.", OPERATOR_NAME)); } - @Test + @Test(timeout = 10000L) public void testStart() throws Exception { assertFalse(enumerator.started()); sourceCoordinator.start(); - assertTrue(enumerator.started()); + while (!enumerator.started()) { + Thread.sleep(1); + } } @Test
