This is an automated email from the ASF dual-hosted git repository. jqin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0f19c2472c54aac97e4067f5398731ab90036d1a Author: Jiangjie (Becket) Qin <[email protected]> AuthorDate: Thu Feb 10 15:13:55 2022 +0800 [FLINK-24607] Make OperatorCoordinator closure more robust. --- .../RecreateOnResetOperatorCoordinator.java | 12 ++++- .../source/coordinator/ExecutorNotifier.java | 21 +------- .../source/coordinator/SourceCoordinator.java | 36 ++++--------- .../coordinator/SourceCoordinatorContext.java | 24 +++++---- .../coordinator/SourceCoordinatorProvider.java | 11 +--- .../source/coordinator/ExecutorNotifierTest.java | 14 ++--- .../source/coordinator/SourceCoordinatorTest.java | 63 ++++++++++++++++++++-- .../coordinator/SourceCoordinatorTestBase.java | 1 - 8 files changed, 102 insertions(+), 80 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java index 5c660d0..ffab3ff 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java @@ -128,8 +128,16 @@ public class RecreateOnResetOperatorCoordinator implements OperatorCoordinator { // capture the status whether the coordinator was started when this method was called final boolean wasStarted = this.started; - closingFuture.thenRun( - () -> { + closingFuture.whenComplete( + (ignored, e) -> { + if (e != null) { + LOG.warn( + String.format( + "Received exception when closing " + + "operator coordinator for %s.", + oldCoordinator.operatorId), + e); + } if (!closed) { // The previous coordinator has closed. Create a new one. newCoordinator.createNewInternalCoordinator(context, provider); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java index e52f6cd..fe4cf8a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java @@ -25,23 +25,20 @@ import java.util.concurrent.Callable; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; /** * This class is used to coordinate between two components, where one component has an executor * following the mailbox model and the other component notifies it when needed. */ -public class ExecutorNotifier implements AutoCloseable { +public class ExecutorNotifier { private static final Logger LOG = LoggerFactory.getLogger(ExecutorNotifier.class); private final ScheduledExecutorService workerExecutor; private final Executor executorToNotify; - private final AtomicBoolean closed; public ExecutorNotifier(ScheduledExecutorService workerExecutor, Executor executorToNotify) { this.executorToNotify = executorToNotify; this.workerExecutor = workerExecutor; - this.closed = new AtomicBoolean(false); } /** @@ -140,20 +137,4 @@ public class ExecutorNotifier implements AutoCloseable { periodMs, TimeUnit.MILLISECONDS); } - - /** - * Close the executor notifier. This is a blocking call which waits for all the async calls to - * finish before it returns. - * - * @throws InterruptedException when interrupted during closure. - */ - public void close() throws InterruptedException { - if (!closed.compareAndSet(false, true)) { - LOG.debug("The executor notifier has been closed."); - return; - } - // Shutdown the worker executor, so no more worker tasks can run. - workerExecutor.shutdownNow(); - workerExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); - } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java index b2916dd..c1c61b6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java @@ -61,12 +61,13 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import static java.util.Arrays.asList; import static org.apache.flink.runtime.source.coordinator.SourceCoordinatorSerdeUtils.readAndVerifyCoordinatorSerdeVersion; import static org.apache.flink.runtime.source.coordinator.SourceCoordinatorSerdeUtils.readBytes; import static org.apache.flink.runtime.source.coordinator.SourceCoordinatorSerdeUtils.writeCoordinatorSerdeVersion; +import static org.apache.flink.util.IOUtils.closeAll; import static org.apache.flink.util.Preconditions.checkState; /** @@ -93,8 +94,6 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> /** The name of the operator this SourceCoordinator is associated with. */ private final String operatorName; - /** A single-thread executor to handle all the changes to the coordinator. */ - private final ScheduledExecutorService coordinatorExecutor; /** The Source that is associated with this SourceCoordinator. */ private final Source<?, SplitT, EnumChkT> source; /** The serializer that handles the serde of the SplitEnumerator checkpoints. */ @@ -113,13 +112,11 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> public SourceCoordinator( String operatorName, - ScheduledExecutorService coordinatorExecutor, Source<?, SplitT, EnumChkT> source, SourceCoordinatorContext<SplitT> context, CoordinatorStore coordinatorStore) { this( operatorName, - coordinatorExecutor, source, context, coordinatorStore, @@ -128,13 +125,11 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> public SourceCoordinator( String operatorName, - ScheduledExecutorService coordinatorExecutor, Source<?, SplitT, EnumChkT> source, SourceCoordinatorContext<SplitT> context, CoordinatorStore coordinatorStore, WatermarkAlignmentParams watermarkAlignmentParams) { this.operatorName = operatorName; - this.coordinatorExecutor = coordinatorExecutor; this.source = source; this.enumCheckpointSerializer = source.getEnumeratorCheckpointSerializer(); this.context = context; @@ -144,11 +139,12 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> if (watermarkAlignmentParams.isEnabled()) { coordinatorStore.putIfAbsent( watermarkAlignmentParams.getWatermarkGroup(), new WatermarkAggregator<>()); - coordinatorExecutor.scheduleAtFixedRate( - this::announceCombinedWatermark, - watermarkAlignmentParams.getUpdateInterval(), - watermarkAlignmentParams.getUpdateInterval(), - TimeUnit.MILLISECONDS); + context.getCoordinatorExecutor() + .scheduleAtFixedRate( + this::announceCombinedWatermark, + watermarkAlignmentParams.getUpdateInterval(), + watermarkAlignmentParams.getUpdateInterval(), + TimeUnit.MILLISECONDS); } } @@ -216,18 +212,8 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> @Override public void close() throws Exception { LOG.info("Closing SourceCoordinator for source {}.", operatorName); - try { - if (started) { - context.close(); - if (enumerator != null) { - enumerator.close(); - } - } - } finally { - coordinatorExecutor.shutdownNow(); - // We do not expect this to actually block for long. At this point, there should - // be very few task running in the executor, if any. - coordinatorExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); + if (started) { + closeAll(asList(context, enumerator), Throwable.class); } LOG.info("Source coordinator for source {} closed.", operatorName); } @@ -414,7 +400,7 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> return; } - coordinatorExecutor.execute( + context.runInCoordinatorThread( () -> { try { action.run(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java index 83823f7..064a743 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java @@ -42,6 +42,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.time.Duration; import java.util.Collections; import java.util.List; import java.util.Map; @@ -50,12 +51,12 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; +import static org.apache.flink.runtime.operators.coordination.ComponentClosingUtils.shutdownExecutorForcefully; + /** * A context class for the {@link OperatorCoordinator}. Compared with {@link SplitEnumeratorContext} * this class allows interaction with state and sending {@link OperatorEvent} to the SourceOperator @@ -82,7 +83,8 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit> private static final Logger LOG = LoggerFactory.getLogger(SourceCoordinatorContext.class); - private final ExecutorService coordinatorExecutor; + private final ScheduledExecutorService workerExecutor; + private final ScheduledExecutorService coordinatorExecutor; private final ExecutorNotifier notifier; private final OperatorCoordinator.Context operatorCoordinatorContext; private final SimpleVersionedSerializer<SplitT> splitSerializer; @@ -95,13 +97,12 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit> private volatile boolean closed; public SourceCoordinatorContext( - ExecutorService coordinatorExecutor, SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory, int numWorkerThreads, OperatorCoordinator.Context operatorCoordinatorContext, SimpleVersionedSerializer<SplitT> splitSerializer) { this( - coordinatorExecutor, + Executors.newScheduledThreadPool(1, coordinatorThreadFactory), Executors.newScheduledThreadPool( numWorkerThreads, new ExecutorThreadFactory( @@ -115,12 +116,13 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit> // Package private method for unit test. @VisibleForTesting SourceCoordinatorContext( - ExecutorService coordinatorExecutor, + ScheduledExecutorService coordinatorExecutor, ScheduledExecutorService workerExecutor, SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory, OperatorCoordinator.Context operatorCoordinatorContext, SimpleVersionedSerializer<SplitT> splitSerializer, SplitAssignmentTracker<SplitT> splitAssignmentTracker) { + this.workerExecutor = workerExecutor; this.coordinatorExecutor = coordinatorExecutor; this.coordinatorThreadFactory = coordinatorThreadFactory; this.operatorCoordinatorContext = operatorCoordinatorContext; @@ -173,6 +175,10 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit> String.format("Failed to send event %s to subtask %d", event, subtaskId)); } + ScheduledExecutorService getCoordinatorExecutor() { + return coordinatorExecutor; + } + @Override public int currentParallelism() { return operatorCoordinatorContext.currentParallelism(); @@ -259,9 +265,9 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit> @Override public void close() throws InterruptedException { closed = true; - notifier.close(); - coordinatorExecutor.shutdown(); - coordinatorExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS); + // Close quietly so the closing sequence will be executed completely. + shutdownExecutorForcefully(workerExecutor, Duration.ofNanos(Long.MAX_VALUE)); + shutdownExecutorForcefully(coordinatorExecutor, Duration.ofNanos(Long.MAX_VALUE)); } // --------- Package private additional methods for the SourceCoordinator ------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java index b74c007..55df066 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java @@ -31,8 +31,6 @@ import org.apache.flink.util.FatalExitExceptionHandler; import javax.annotation.Nullable; import java.util.concurrent.Callable; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.function.BiConsumer; @@ -75,20 +73,13 @@ public class SourceCoordinatorProvider<SplitT extends SourceSplit> CoordinatorExecutorThreadFactory coordinatorThreadFactory = new CoordinatorExecutorThreadFactory( coordinatorThreadName, context.getUserCodeClassloader()); - ScheduledExecutorService coordinatorExecutor = - Executors.newScheduledThreadPool(1, coordinatorThreadFactory); SimpleVersionedSerializer<SplitT> splitSerializer = source.getSplitSerializer(); SourceCoordinatorContext<SplitT> sourceCoordinatorContext = new SourceCoordinatorContext<>( - coordinatorExecutor, - coordinatorThreadFactory, - numWorkerThreads, - context, - splitSerializer); + coordinatorThreadFactory, numWorkerThreads, context, splitSerializer); return new SourceCoordinator<>( operatorName, - coordinatorExecutor, source, sourceCoordinatorContext, context.getCoordinatorStore(), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifierTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifierTest.java index 8f7a806..76a1b54 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifierTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifierTest.java @@ -22,6 +22,7 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import java.time.Duration; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -29,6 +30,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import static org.apache.flink.runtime.operators.coordination.ComponentClosingUtils.shutdownExecutorForcefully; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -62,8 +64,8 @@ public class ExecutorNotifierTest { @After public void tearDown() throws InterruptedException { - notifier.close(); - closeExecutorToNotify(); + shutdownExecutorForcefully(workerExecutor, Duration.ofNanos(Long.MAX_VALUE)); + shutdownExecutorForcefully(executorToNotify, Duration.ofNanos(Long.MAX_VALUE)); } @Test @@ -77,7 +79,6 @@ public class ExecutorNotifierTest { latch.countDown(); }); latch.await(); - closeExecutorToNotify(); assertEquals(1234, result.get()); } @@ -110,7 +111,6 @@ public class ExecutorNotifierTest { throw exception2; }); latch.await(); - closeExecutorToNotify(); // The uncaught exception handler may fire after the executor has shutdown. // We need to wait on the countdown latch here. exceptionInHandlerLatch.await(10000L, TimeUnit.MILLISECONDS); @@ -128,15 +128,9 @@ public class ExecutorNotifierTest { throw exception; }); latch.await(); - closeExecutorToNotify(); // The uncaught exception handler may fire after the executor has shutdown. // We need to wait on the countdown latch here. exceptionInHandlerLatch.await(10000L, TimeUnit.MILLISECONDS); assertEquals(exception, exceptionInHandler); } - - private void closeExecutorToNotify() throws InterruptedException { - executorToNotify.shutdown(); - executorToNotify.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS); - } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java index 71df249..12d29bf 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java @@ -34,10 +34,12 @@ import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.core.memory.DataOutputSerializer; import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.operators.coordination.ComponentClosingUtils; import org.apache.flink.runtime.operators.coordination.CoordinatorStoreImpl; import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext; import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; import org.apache.flink.runtime.source.event.SourceEventWrapper; +import org.apache.flink.util.function.ThrowingRunnable; import org.junit.Test; @@ -53,6 +55,8 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeoutException; import java.util.function.Supplier; import static org.apache.flink.core.testutils.CommonTestUtils.waitUtil; @@ -244,7 +248,6 @@ public class SourceCoordinatorTest extends SourceCoordinatorTestBase { final SourceCoordinator<?, ?> coordinator = new SourceCoordinator<>( OPERATOR_NAME, - coordinatorExecutor, new EnumeratorCreatingSource<>(() -> splitEnumerator), context, new CoordinatorStoreImpl(), @@ -266,7 +269,6 @@ public class SourceCoordinatorTest extends SourceCoordinatorTestBase { final SourceCoordinator<?, ?> coordinator = new SourceCoordinator<>( OPERATOR_NAME, - coordinatorExecutor, new EnumeratorCreatingSource<>( () -> { throw failureReason; @@ -296,7 +298,6 @@ public class SourceCoordinatorTest extends SourceCoordinatorTestBase { final SourceCoordinator<?, ?> coordinator = new SourceCoordinator<>( OPERATOR_NAME, - coordinatorExecutor, new EnumeratorCreatingSource<>(() -> splitEnumerator), context, new CoordinatorStoreImpl(), @@ -314,6 +315,62 @@ public class SourceCoordinatorTest extends SourceCoordinatorTestBase { } @Test + public void testBlockOnClose() throws Exception { + // It is possible that the split enumerator submits some heavy-duty work to the + // coordinator executor which blocks the coordinator closure. + final CountDownLatch latch = new CountDownLatch(1); + try (final MockSplitEnumeratorContext<MockSourceSplit> enumeratorContext = + new MockSplitEnumeratorContext<>(1); + final MockSplitEnumerator splitEnumerator = + new MockSplitEnumerator(1, enumeratorContext) { + @Override + public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { + context.callAsync( + () -> 1L, + (ignored, t) -> { + latch.countDown(); + // Submit a callable that will never return. + try { + Thread.sleep(Long.MAX_VALUE); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + } + }; + final SourceCoordinator<?, ?> coordinator = + new SourceCoordinator<>( + OPERATOR_NAME, + new EnumeratorCreatingSource<>(() -> splitEnumerator), + context, + new CoordinatorStoreImpl())) { + + coordinator.start(); + coordinator.handleEventFromOperator(1, new SourceEventWrapper(new SourceEvent() {})); + // Wait until the coordinator executor blocks. + latch.await(); + + CompletableFuture<?> future = + ComponentClosingUtils.closeAsyncWithTimeout( + "testBlockOnClose", + (ThrowingRunnable<Exception>) coordinator::close, + Duration.ofMillis(1)); + + future.exceptionally( + e -> { + assertTrue(e instanceof TimeoutException); + return null; + }) + .get(); + + waitUtil( + splitEnumerator::closed, + Duration.ofSeconds(5), + "Split enumerator was not closed in 5 seconds."); + } + } + + @Test public void testUserClassLoaderWhenCreatingNewEnumerator() throws Exception { final ClassLoader testClassLoader = new URLClassLoader(new URL[0]); final OperatorCoordinator.Context context = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java index f59cb2d..d9f6c32 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java @@ -161,7 +161,6 @@ public abstract class SourceCoordinatorTestBase { return new SourceCoordinator<>( OPERATOR_NAME, - coordinatorExecutor, mockSource, getNewSourceCoordinatorContext(), new CoordinatorStoreImpl(),
