This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 79cb1a140991d13b2010d10c86e054fcced977c4 Author: Chesnay Schepler <[email protected]> AuthorDate: Tue May 19 16:58:22 2020 +0200 [FLINK-17558][netty] Release partitions asynchronously --- .../io/network/NettyShuffleEnvironment.java | 15 ++++-- .../io/network/NettyShuffleServiceFactory.java | 31 ++++++++++-- .../runtime/shuffle/ShuffleEnvironmentContext.java | 11 ++++- .../runtime/taskexecutor/TaskManagerRunner.java | 7 ++- .../runtime/taskexecutor/TaskManagerServices.java | 19 ++++---- .../TaskManagerServicesConfiguration.java | 18 +++++-- .../io/network/NettyShuffleEnvironmentBuilder.java | 21 ++++++++- .../io/network/NettyShuffleEnvironmentTest.java | 26 ++++++++++ .../TaskExecutorLocalStateStoresManagerTest.java | 2 +- .../TaskExecutorPartitionLifecycleTest.java | 55 ++++++++++++++++++++++ 10 files changed, 179 insertions(+), 26 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java index 8938f5e..61107ba 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java @@ -57,6 +57,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; import static org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory.METRIC_GROUP_INPUT; import static org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory.METRIC_GROUP_OUTPUT; @@ -95,6 +96,8 @@ public class NettyShuffleEnvironment implements ShuffleEnvironment<ResultPartiti private final SingleInputGateFactory singleInputGateFactory; + private final Executor ioExecutor; + private boolean isClosed; NettyShuffleEnvironment( @@ -105,7 +108,8 @@ public class NettyShuffleEnvironment implements ShuffleEnvironment<ResultPartiti ResultPartitionManager resultPartitionManager, FileChannelManager fileChannelManager, ResultPartitionFactory resultPartitionFactory, - SingleInputGateFactory singleInputGateFactory) { + SingleInputGateFactory singleInputGateFactory, + Executor ioExecutor) { this.taskExecutorResourceId = taskExecutorResourceId; this.config = config; this.networkBufferPool = networkBufferPool; @@ -115,6 +119,7 @@ public class NettyShuffleEnvironment implements ShuffleEnvironment<ResultPartiti this.fileChannelManager = fileChannelManager; this.resultPartitionFactory = resultPartitionFactory; this.singleInputGateFactory = singleInputGateFactory; + this.ioExecutor = ioExecutor; this.isClosed = false; } @@ -149,9 +154,11 @@ public class NettyShuffleEnvironment implements ShuffleEnvironment<ResultPartiti @Override public void releasePartitionsLocally(Collection<ResultPartitionID> partitionIds) { - for (ResultPartitionID partitionId : partitionIds) { - resultPartitionManager.releasePartition(partitionId, null); - } + ioExecutor.execute(() -> { + for (ResultPartitionID partitionId : partitionIds) { + resultPartitionManager.releasePartition(partitionId, null); + } + }); } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java index 9c4f95b..87ce8cd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java @@ -38,6 +38,8 @@ import org.apache.flink.runtime.shuffle.ShuffleEnvironmentContext; import org.apache.flink.runtime.shuffle.ShuffleServiceFactory; import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration; +import java.util.concurrent.Executor; + import static org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory.registerShuffleMetrics; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -65,7 +67,8 @@ public class NettyShuffleServiceFactory implements ShuffleServiceFactory<NettySh networkConfig, shuffleEnvironmentContext.getTaskExecutorResourceId(), shuffleEnvironmentContext.getEventPublisher(), - shuffleEnvironmentContext.getParentMetricGroup()); + shuffleEnvironmentContext.getParentMetricGroup(), + shuffleEnvironmentContext.getIoExecutor()); } @VisibleForTesting @@ -73,16 +76,33 @@ public class NettyShuffleServiceFactory implements ShuffleServiceFactory<NettySh NettyShuffleEnvironmentConfiguration config, ResourceID taskExecutorResourceId, TaskEventPublisher taskEventPublisher, - MetricGroup metricGroup) { + MetricGroup metricGroup, + Executor ioExecutor) { + return createNettyShuffleEnvironment( + config, + taskExecutorResourceId, + taskEventPublisher, + new ResultPartitionManager(), + metricGroup, + ioExecutor); + } + + @VisibleForTesting + static NettyShuffleEnvironment createNettyShuffleEnvironment( + NettyShuffleEnvironmentConfiguration config, + ResourceID taskExecutorResourceId, + TaskEventPublisher taskEventPublisher, + ResultPartitionManager resultPartitionManager, + MetricGroup metricGroup, + Executor ioExecutor) { checkNotNull(config); checkNotNull(taskExecutorResourceId); checkNotNull(taskEventPublisher); + checkNotNull(resultPartitionManager); checkNotNull(metricGroup); NettyConfig nettyConfig = config.nettyConfig(); - ResultPartitionManager resultPartitionManager = new ResultPartitionManager(); - FileChannelManager fileChannelManager = new FileChannelManagerImpl(config.getTempDirs(), DIR_NAME_PREFIX); ConnectionManager connectionManager = nettyConfig != null ? @@ -126,6 +146,7 @@ public class NettyShuffleServiceFactory implements ShuffleServiceFactory<NettySh resultPartitionManager, fileChannelManager, resultPartitionFactory, - singleInputGateFactory); + singleInputGateFactory, + ioExecutor); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java index 7863f18..3116372 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.io.network.TaskEventPublisher; import java.net.InetAddress; +import java.util.concurrent.Executor; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -40,6 +41,8 @@ public class ShuffleEnvironmentContext { private final TaskEventPublisher eventPublisher; private final MetricGroup parentMetricGroup; + private final Executor ioExecutor; + public ShuffleEnvironmentContext( Configuration configuration, ResourceID taskExecutorResourceId, @@ -47,7 +50,8 @@ public class ShuffleEnvironmentContext { boolean localCommunicationOnly, InetAddress hostAddress, TaskEventPublisher eventPublisher, - MetricGroup parentMetricGroup) { + MetricGroup parentMetricGroup, + Executor ioExecutor) { this.configuration = checkNotNull(configuration); this.taskExecutorResourceId = checkNotNull(taskExecutorResourceId); this.networkMemorySize = networkMemorySize; @@ -55,6 +59,7 @@ public class ShuffleEnvironmentContext { this.hostAddress = checkNotNull(hostAddress); this.eventPublisher = checkNotNull(eventPublisher); this.parentMetricGroup = checkNotNull(parentMetricGroup); + this.ioExecutor = ioExecutor; } public Configuration getConfiguration() { @@ -84,4 +89,8 @@ public class ShuffleEnvironmentContext { public MetricGroup getParentMetricGroup() { return parentMetricGroup; } + + public Executor getIoExecutor() { + return ioExecutor; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java index 88786bf..9772700 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java @@ -30,6 +30,7 @@ import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.blob.BlobCacheService; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.entrypoint.FlinkParseException; @@ -366,11 +367,15 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync resourceID, taskManagerServicesConfiguration.getSystemResourceMetricsProbingInterval()); + final ExecutorService ioExecutor = Executors.newCachedThreadPool( + taskManagerServicesConfiguration.getNumIoThreads(), + new ExecutorThreadFactory("flink-taskexecutor-io")); + TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration( taskManagerServicesConfiguration, blobCacheService.getPermanentBlobService(), taskManagerMetricGroup.f1, - rpcService.getExecutor()); // TODO replace this later with some dedicated executor for io. + ioExecutor); TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration, taskExecutorResourceSpec, externalAddress); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java index ee737c1..d6aa2c4 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java @@ -38,7 +38,6 @@ import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl; import org.apache.flink.runtime.taskexecutor.slot.TimerService; import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation; -import org.apache.flink.runtime.util.ExecutorThreadFactory; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; @@ -50,7 +49,6 @@ import java.io.File; import java.io.IOException; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledThreadPoolExecutor; /** @@ -247,7 +245,7 @@ public class TaskManagerServices { * @param taskManagerServicesConfiguration task manager configuration * @param permanentBlobService permanentBlobService used by the services * @param taskManagerMetricGroup metric group of the task manager - * @param taskIOExecutor executor for async IO operations + * @param ioExecutor executor for async IO operations * @return task manager components * @throws Exception */ @@ -255,7 +253,7 @@ public class TaskManagerServices { TaskManagerServicesConfiguration taskManagerServicesConfiguration, PermanentBlobService permanentBlobService, MetricGroup taskManagerMetricGroup, - Executor taskIOExecutor) throws Exception { + ExecutorService ioExecutor) throws Exception { // pre-start checks checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths()); @@ -268,7 +266,8 @@ public class TaskManagerServices { final ShuffleEnvironment<?, ?> shuffleEnvironment = createShuffleEnvironment( taskManagerServicesConfiguration, taskEventDispatcher, - taskManagerMetricGroup); + taskManagerMetricGroup, + ioExecutor); final int listeningDataPort = shuffleEnvironment.start(); final KvStateService kvStateService = KvStateService.fromConfiguration(taskManagerServicesConfiguration); @@ -306,9 +305,7 @@ public class TaskManagerServices { final TaskExecutorLocalStateStoresManager taskStateManager = new TaskExecutorLocalStateStoresManager( taskManagerServicesConfiguration.isLocalRecoveryEnabled(), stateRootDirectoryFiles, - taskIOExecutor); - - final ExecutorService ioExecutor = Executors.newSingleThreadExecutor(new ExecutorThreadFactory("taskexecutor-io")); + ioExecutor); final LibraryCacheManager libraryCacheManager = new BlobLibraryCacheManager( permanentBlobService, @@ -351,7 +348,8 @@ public class TaskManagerServices { private static ShuffleEnvironment<?, ?> createShuffleEnvironment( TaskManagerServicesConfiguration taskManagerServicesConfiguration, TaskEventDispatcher taskEventDispatcher, - MetricGroup taskManagerMetricGroup) throws FlinkException { + MetricGroup taskManagerMetricGroup, + Executor ioExecutor) throws FlinkException { final ShuffleEnvironmentContext shuffleEnvironmentContext = new ShuffleEnvironmentContext( taskManagerServicesConfiguration.getConfiguration(), @@ -360,7 +358,8 @@ public class TaskManagerServices { taskManagerServicesConfiguration.isLocalCommunicationOnly(), taskManagerServicesConfiguration.getBindAddress(), taskEventDispatcher, - taskManagerMetricGroup); + taskManagerMetricGroup, + ioExecutor); return ShuffleServiceLoader .loadShuffleServiceFactory(taskManagerServicesConfiguration.getConfiguration()) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java index bb50b62..5d126ae 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders; import org.apache.flink.runtime.registration.RetryingRegistrationConfiguration; +import org.apache.flink.runtime.util.ClusterEntrypointUtils; import org.apache.flink.runtime.util.ConfigurationParserUtils; import org.apache.flink.util.NetUtils; @@ -84,7 +85,9 @@ public class TaskManagerServicesConfiguration { private final String[] alwaysParentFirstLoaderPatterns; - public TaskManagerServicesConfiguration( + private final int numIoThreads; + + private TaskManagerServicesConfiguration( Configuration configuration, ResourceID resourceID, String externalAddress, @@ -102,7 +105,8 @@ public class TaskManagerServicesConfiguration { RetryingRegistrationConfiguration retryingRegistrationConfiguration, Optional<Time> systemResourceMetricsProbingInterval, FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder, - String[] alwaysParentFirstLoaderPatterns) { + String[] alwaysParentFirstLoaderPatterns, + int numIoThreads) { this.configuration = checkNotNull(configuration); this.resourceID = checkNotNull(resourceID); @@ -121,6 +125,7 @@ public class TaskManagerServicesConfiguration { this.taskExecutorResourceSpec = taskExecutorResourceSpec; this.classLoaderResolveOrder = classLoaderResolveOrder; this.alwaysParentFirstLoaderPatterns = alwaysParentFirstLoaderPatterns; + this.numIoThreads = numIoThreads; checkArgument(timerServiceShutdownTimeout >= 0L, "The timer " + "service shutdown timeout must be greater or equal to 0."); @@ -215,6 +220,10 @@ public class TaskManagerServicesConfiguration { return alwaysParentFirstLoaderPatterns; } + public int getNumIoThreads() { + return numIoThreads; + } + // -------------------------------------------------------------------------------------------- // Parsing of Flink configuration // -------------------------------------------------------------------------------------------- @@ -262,6 +271,8 @@ public class TaskManagerServicesConfiguration { final String[] alwaysParentFirstLoaderPatterns = CoreOptions.getParentFirstLoaderPatterns(configuration); + final int numIoThreads = ClusterEntrypointUtils.getPoolSize(configuration); + return new TaskManagerServicesConfiguration( configuration, resourceID, @@ -280,6 +291,7 @@ public class TaskManagerServicesConfiguration { retryingRegistrationConfiguration, ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration), FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder), - alwaysParentFirstLoaderPatterns); + alwaysParentFirstLoaderPatterns, + numIoThreads); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java index 5c377d2..ab999fc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java @@ -20,13 +20,16 @@ package org.apache.flink.runtime.io.network; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.io.network.netty.NettyConfig; import org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionType; +import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration; import org.apache.flink.runtime.util.EnvironmentInformation; import java.time.Duration; +import java.util.concurrent.Executor; /** * Builder for the {@link NettyShuffleEnvironment}. @@ -63,6 +66,10 @@ public class NettyShuffleEnvironmentBuilder { private MetricGroup metricGroup = UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(); + private ResultPartitionManager resultPartitionManager = new ResultPartitionManager(); + + private Executor ioExecutor = Executors.directExecutor(); + public NettyShuffleEnvironmentBuilder setTaskManagerLocation(ResourceID taskManagerLocation) { this.taskManagerLocation = taskManagerLocation; return this; @@ -123,6 +130,16 @@ public class NettyShuffleEnvironmentBuilder { return this; } + public NettyShuffleEnvironmentBuilder setResultPartitionManager(ResultPartitionManager resultPartitionManager) { + this.resultPartitionManager = resultPartitionManager; + return this; + } + + public NettyShuffleEnvironmentBuilder setIoExecutor(Executor ioExecutor) { + this.ioExecutor = ioExecutor; + return this; + } + public NettyShuffleEnvironment build() { return NettyShuffleServiceFactory.createNettyShuffleEnvironment( new NettyShuffleEnvironmentConfiguration( @@ -143,6 +160,8 @@ public class NettyShuffleEnvironmentBuilder { maxBuffersPerChannel), taskManagerLocation, new TaskEventDispatcher(), - metricGroup); + resultPartitionManager, + metricGroup, + ioExecutor); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java index 826f15c..62aba7e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java @@ -19,10 +19,13 @@ package org.apache.flink.runtime.io.network; import org.apache.flink.configuration.NettyShuffleEnvironmentOptions; +import org.apache.flink.core.testutils.BlockerSync; import org.apache.flink.runtime.io.disk.FileChannelManager; import org.apache.flink.runtime.io.disk.FileChannelManagerImpl; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.io.network.partition.ResultPartition; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.io.network.partition.consumer.InputChannel; import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder; @@ -41,6 +44,8 @@ import org.junit.Test; import org.junit.rules.ExpectedException; import java.io.IOException; +import java.util.Collections; +import java.util.concurrent.Executors; import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createDummyConnectionManager; import static org.apache.flink.runtime.io.network.partition.PartitionTestUtils.createPartition; @@ -100,6 +105,27 @@ public class NettyShuffleEnvironmentTest extends TestLogger { testRegisterTaskWithLimitedBuffers(bufferCount); } + @Test + public void testSlowIODoesNotBlockRelease() throws Exception { + BlockerSync sync = new BlockerSync(); + ResultPartitionManager blockingResultPartitionManager = new ResultPartitionManager() { + @Override + public void releasePartition(ResultPartitionID partitionId, Throwable cause) { + sync.blockNonInterruptible(); + super.releasePartition(partitionId, cause); + } + }; + + NettyShuffleEnvironment shuffleEnvironment = new NettyShuffleEnvironmentBuilder() + .setResultPartitionManager(blockingResultPartitionManager) + .setIoExecutor(Executors.newFixedThreadPool(1)) + .build(); + + shuffleEnvironment.releasePartitionsLocally(Collections.singleton(new ResultPartitionID())); + sync.awaitBlocker(); + sync.releaseBlocker(); + } + private void testRegisterTaskWithLimitedBuffers(int bufferPoolSize) throws Exception { final NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder() .setNumNetworkBuffers(bufferPoolSize) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java index 0560184..958b92c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java @@ -227,6 +227,6 @@ public class TaskExecutorLocalStateStoresManagerTest extends TestLogger { config, VoidPermanentBlobService.INSTANCE, UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), - Executors.directExecutor()); + Executors.newDirectExecutorService()); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java index b538b36..9a3afa1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java @@ -38,11 +38,15 @@ import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.io.network.NettyShuffleEnvironment; import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder; import org.apache.flink.runtime.io.network.partition.PartitionTestUtils; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionInfo; import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTracker; +import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTrackerImpl; import org.apache.flink.runtime.io.network.partition.TestingTaskExecutorPartitionTracker; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; @@ -63,6 +67,7 @@ import org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils; import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions; import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.util.TestingFatalErrorHandler; +import org.apache.flink.testutils.executor.TestExecutorResource; import org.apache.flink.util.SerializedValue; import org.apache.flink.util.TestLogger; import org.apache.flink.util.function.TriConsumer; @@ -71,6 +76,7 @@ import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -84,6 +90,7 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.StreamSupport; @@ -110,6 +117,10 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger { @Rule public final TemporaryFolder tmp = new TemporaryFolder(); + @ClassRule + public static final TestExecutorResource TEST_EXECUTOR_SERVICE_RESOURCE = + new TestExecutorResource(() -> java.util.concurrent.Executors.newFixedThreadPool(1)); + @Before public void setup() { haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetriever); @@ -268,6 +279,50 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger { ); } + @Test + public void testBlockingLocalPartitionReleaseDoesNotBlockTaskExecutor() throws Exception { + BlockerSync sync = new BlockerSync(); + ResultPartitionManager blockingResultPartitionManager = new ResultPartitionManager() { + @Override + public void releasePartition(ResultPartitionID partitionId, Throwable cause) { + sync.blockNonInterruptible(); + super.releasePartition(partitionId, cause); + } + }; + + NettyShuffleEnvironment shuffleEnvironment = new NettyShuffleEnvironmentBuilder() + .setResultPartitionManager(blockingResultPartitionManager) + .setIoExecutor(TEST_EXECUTOR_SERVICE_RESOURCE.getExecutor()) + .build(); + + final CompletableFuture<ResultPartitionID> startTrackingFuture = new CompletableFuture<>(); + final TaskExecutorPartitionTracker partitionTracker = new TaskExecutorPartitionTrackerImpl(shuffleEnvironment) { + @Override + public void startTrackingPartition(JobID producingJobId, TaskExecutorPartitionInfo partitionInfo) { + super.startTrackingPartition(producingJobId, partitionInfo); + startTrackingFuture.complete(partitionInfo.getResultPartitionId()); + } + }; + + try { + internalTestPartitionRelease( + partitionTracker, + shuffleEnvironment, + startTrackingFuture, + (jobId, resultPartitionDeploymentDescriptor, taskExecutor, taskExecutorGateway) -> { + final IntermediateDataSetID dataSetId = resultPartitionDeploymentDescriptor.getResultId(); + + taskExecutorGateway.releaseClusterPartitions(Collections.singleton(dataSetId), timeout); + + // execute some operation to check whether the TaskExecutor is blocked + taskExecutorGateway.canBeReleased().get(5, TimeUnit.SECONDS); + } + ); + } finally { + sync.releaseBlocker(); + } + } + private void testPartitionRelease(PartitionTrackerSetup partitionTrackerSetup, TestAction testAction) throws Exception { final TestingTaskExecutorPartitionTracker partitionTracker = new TestingTaskExecutorPartitionTracker(); final CompletableFuture<ResultPartitionID> startTrackingFuture = new CompletableFuture<>();
