This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
commit c6c0839d40571bb9ad47b8dbbec0b0ea14615372 Author: Chesnay Schepler <[email protected]> AuthorDate: Tue May 19 16:58:22 2020 +0200 [FLINK-17558][netty] Release partitions asynchronously --- .../io/network/NettyShuffleEnvironment.java | 15 ++++-- .../io/network/NettyShuffleServiceFactory.java | 31 +++++++++++-- .../runtime/shuffle/ShuffleEnvironmentContext.java | 11 ++++- .../runtime/taskexecutor/TaskManagerRunner.java | 7 ++- .../runtime/taskexecutor/TaskManagerServices.java | 16 ++++--- .../TaskManagerServicesConfiguration.java | 16 ++++++- .../io/network/NettyShuffleEnvironmentBuilder.java | 21 ++++++++- .../io/network/NettyShuffleEnvironmentTest.java | 26 +++++++++++ .../TaskExecutorLocalStateStoresManagerTest.java | 2 +- .../TaskExecutorPartitionLifecycleTest.java | 53 ++++++++++++++++++++++ 10 files changed, 177 insertions(+), 21 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java index 2d2d6f3..5e7c119 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java @@ -56,6 +56,7 @@ import java.util.Collection; import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; import static org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory.METRIC_GROUP_INPUT; import static org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory.METRIC_GROUP_OUTPUT; @@ -94,6 +95,8 @@ public class NettyShuffleEnvironment implements ShuffleEnvironment<ResultPartiti private final SingleInputGateFactory singleInputGateFactory; + private final Executor ioExecutor; + private boolean isClosed; NettyShuffleEnvironment( @@ -104,7 +107,8 @@ public class NettyShuffleEnvironment implements ShuffleEnvironment<ResultPartiti ResultPartitionManager resultPartitionManager, FileChannelManager fileChannelManager, ResultPartitionFactory resultPartitionFactory, - SingleInputGateFactory singleInputGateFactory) { + SingleInputGateFactory singleInputGateFactory, + Executor ioExecutor) { this.taskExecutorResourceId = taskExecutorResourceId; this.config = config; this.networkBufferPool = networkBufferPool; @@ -114,6 +118,7 @@ public class NettyShuffleEnvironment implements ShuffleEnvironment<ResultPartiti this.fileChannelManager = fileChannelManager; this.resultPartitionFactory = resultPartitionFactory; this.singleInputGateFactory = singleInputGateFactory; + this.ioExecutor = ioExecutor; this.isClosed = false; } @@ -148,9 +153,11 @@ public class NettyShuffleEnvironment implements ShuffleEnvironment<ResultPartiti @Override public void releasePartitionsLocally(Collection<ResultPartitionID> partitionIds) { - for (ResultPartitionID partitionId : partitionIds) { - resultPartitionManager.releasePartition(partitionId, null); - } + ioExecutor.execute(() -> { + for (ResultPartitionID partitionId : partitionIds) { + resultPartitionManager.releasePartition(partitionId, null); + } + }); } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java index afbaba2..f065122 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java @@ -38,6 +38,8 @@ import org.apache.flink.runtime.shuffle.ShuffleEnvironmentContext; import org.apache.flink.runtime.shuffle.ShuffleServiceFactory; import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration; +import java.util.concurrent.Executor; + import static org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory.registerShuffleMetrics; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -65,7 +67,8 @@ public class NettyShuffleServiceFactory implements ShuffleServiceFactory<NettySh networkConfig, shuffleEnvironmentContext.getTaskExecutorResourceId(), shuffleEnvironmentContext.getEventPublisher(), - shuffleEnvironmentContext.getParentMetricGroup()); + shuffleEnvironmentContext.getParentMetricGroup(), + shuffleEnvironmentContext.getIoExecutor()); } @VisibleForTesting @@ -73,16 +76,33 @@ public class NettyShuffleServiceFactory implements ShuffleServiceFactory<NettySh NettyShuffleEnvironmentConfiguration config, ResourceID taskExecutorResourceId, TaskEventPublisher taskEventPublisher, - MetricGroup metricGroup) { + MetricGroup metricGroup, + Executor ioExecutor) { + return createNettyShuffleEnvironment( + config, + taskExecutorResourceId, + taskEventPublisher, + new ResultPartitionManager(), + metricGroup, + ioExecutor); + } + + @VisibleForTesting + static NettyShuffleEnvironment createNettyShuffleEnvironment( + NettyShuffleEnvironmentConfiguration config, + ResourceID taskExecutorResourceId, + TaskEventPublisher taskEventPublisher, + ResultPartitionManager resultPartitionManager, + MetricGroup metricGroup, + Executor ioExecutor) { checkNotNull(config); checkNotNull(taskExecutorResourceId); checkNotNull(taskEventPublisher); + checkNotNull(resultPartitionManager); checkNotNull(metricGroup); NettyConfig nettyConfig = config.nettyConfig(); - ResultPartitionManager resultPartitionManager = new ResultPartitionManager(); - FileChannelManager fileChannelManager = new FileChannelManagerImpl(config.getTempDirs(), DIR_NAME_PREFIX); ConnectionManager connectionManager = nettyConfig != null ? @@ -125,6 +145,7 @@ public class NettyShuffleServiceFactory implements ShuffleServiceFactory<NettySh resultPartitionManager, fileChannelManager, resultPartitionFactory, - singleInputGateFactory); + singleInputGateFactory, + ioExecutor); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java index 7863f18..3116372 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.io.network.TaskEventPublisher; import java.net.InetAddress; +import java.util.concurrent.Executor; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -40,6 +41,8 @@ public class ShuffleEnvironmentContext { private final TaskEventPublisher eventPublisher; private final MetricGroup parentMetricGroup; + private final Executor ioExecutor; + public ShuffleEnvironmentContext( Configuration configuration, ResourceID taskExecutorResourceId, @@ -47,7 +50,8 @@ public class ShuffleEnvironmentContext { boolean localCommunicationOnly, InetAddress hostAddress, TaskEventPublisher eventPublisher, - MetricGroup parentMetricGroup) { + MetricGroup parentMetricGroup, + Executor ioExecutor) { this.configuration = checkNotNull(configuration); this.taskExecutorResourceId = checkNotNull(taskExecutorResourceId); this.networkMemorySize = networkMemorySize; @@ -55,6 +59,7 @@ public class ShuffleEnvironmentContext { this.hostAddress = checkNotNull(hostAddress); this.eventPublisher = checkNotNull(eventPublisher); this.parentMetricGroup = checkNotNull(parentMetricGroup); + this.ioExecutor = ioExecutor; } public Configuration getConfiguration() { @@ -84,4 +89,8 @@ public class ShuffleEnvironmentContext { public MetricGroup getParentMetricGroup() { return parentMetricGroup; } + + public Executor getIoExecutor() { + return ioExecutor; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java index 019d186..7f66d03 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java @@ -31,6 +31,7 @@ import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.blob.BlobCacheService; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.entrypoint.ClusterConfiguration; @@ -373,10 +374,14 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync resourceID, taskManagerServicesConfiguration.getSystemResourceMetricsProbingInterval()); + final ExecutorService ioExecutor = Executors.newCachedThreadPool( + taskManagerServicesConfiguration.getNumIoThreads(), + new ExecutorThreadFactory("flink-taskexecutor-io")); + TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration( taskManagerServicesConfiguration, taskManagerMetricGroup.f1, - rpcService.getExecutor()); // TODO replace this later with some dedicated executor for io. + ioExecutor); TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration, taskExecutorResourceSpec); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java index 04c3b29..a443fb4 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java @@ -45,6 +45,7 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; /** @@ -208,14 +209,14 @@ public class TaskManagerServices { * * @param taskManagerServicesConfiguration task manager configuration * @param taskManagerMetricGroup metric group of the task manager - * @param taskIOExecutor executor for async IO operations + * @param ioExecutor executor for async IO operations * @return task manager components * @throws Exception */ public static TaskManagerServices fromConfiguration( TaskManagerServicesConfiguration taskManagerServicesConfiguration, MetricGroup taskManagerMetricGroup, - Executor taskIOExecutor) throws Exception { + ExecutorService ioExecutor) throws Exception { // pre-start checks checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths()); @@ -228,7 +229,8 @@ public class TaskManagerServices { final ShuffleEnvironment<?, ?> shuffleEnvironment = createShuffleEnvironment( taskManagerServicesConfiguration, taskEventDispatcher, - taskManagerMetricGroup); + taskManagerMetricGroup, + ioExecutor); final int dataPort = shuffleEnvironment.start(); final KvStateService kvStateService = KvStateService.fromConfiguration(taskManagerServicesConfiguration); @@ -262,7 +264,7 @@ public class TaskManagerServices { final TaskExecutorLocalStateStoresManager taskStateManager = new TaskExecutorLocalStateStoresManager( taskManagerServicesConfiguration.isLocalRecoveryEnabled(), stateRootDirectoryFiles, - taskIOExecutor); + ioExecutor); return new TaskManagerServices( taskManagerLocation, @@ -297,7 +299,8 @@ public class TaskManagerServices { private static ShuffleEnvironment<?, ?> createShuffleEnvironment( TaskManagerServicesConfiguration taskManagerServicesConfiguration, TaskEventDispatcher taskEventDispatcher, - MetricGroup taskManagerMetricGroup) throws FlinkException { + MetricGroup taskManagerMetricGroup, + Executor ioExecutor) throws FlinkException { final ShuffleEnvironmentContext shuffleEnvironmentContext = new ShuffleEnvironmentContext( taskManagerServicesConfiguration.getConfiguration(), @@ -306,7 +309,8 @@ public class TaskManagerServices { taskManagerServicesConfiguration.isLocalCommunicationOnly(), taskManagerServicesConfiguration.getTaskManagerAddress(), taskEventDispatcher, - taskManagerMetricGroup); + taskManagerMetricGroup, + ioExecutor); return ShuffleServiceLoader .loadShuffleServiceFactory(taskManagerServicesConfiguration.getConfiguration()) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java index 5ec28e0..480394c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java @@ -26,6 +26,7 @@ import org.apache.flink.configuration.MemorySize; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.registration.RetryingRegistrationConfiguration; +import org.apache.flink.runtime.util.ClusterEntrypointUtils; import org.apache.flink.runtime.util.ConfigurationParserUtils; import javax.annotation.Nullable; @@ -71,6 +72,8 @@ public class TaskManagerServicesConfiguration { private final TaskExecutorResourceSpec taskExecutorResourceSpec; + private final int numIoThreads; + public TaskManagerServicesConfiguration( Configuration configuration, ResourceID resourceID, @@ -85,7 +88,8 @@ public class TaskManagerServicesConfiguration { TaskExecutorResourceSpec taskExecutorResourceSpec, long timerServiceShutdownTimeout, RetryingRegistrationConfiguration retryingRegistrationConfiguration, - Optional<Time> systemResourceMetricsProbingInterval) { + Optional<Time> systemResourceMetricsProbingInterval, + int numIoThreads) { this.configuration = checkNotNull(configuration); this.resourceID = checkNotNull(resourceID); @@ -100,6 +104,7 @@ public class TaskManagerServicesConfiguration { this.pageSize = pageSize; this.taskExecutorResourceSpec = taskExecutorResourceSpec; + this.numIoThreads = numIoThreads; checkArgument(timerServiceShutdownTimeout >= 0L, "The timer " + "service shutdown timeout must be greater or equal to 0."); @@ -178,6 +183,10 @@ public class TaskManagerServicesConfiguration { return retryingRegistrationConfiguration; } + public int getNumIoThreads() { + return numIoThreads; + } + // -------------------------------------------------------------------------------------------- // Parsing of Flink configuration // -------------------------------------------------------------------------------------------- @@ -215,6 +224,8 @@ public class TaskManagerServicesConfiguration { final RetryingRegistrationConfiguration retryingRegistrationConfiguration = RetryingRegistrationConfiguration.fromConfiguration(configuration); + final int numIoThreads = ClusterEntrypointUtils.getPoolSize(configuration); + return new TaskManagerServicesConfiguration( configuration, resourceID, @@ -229,6 +240,7 @@ public class TaskManagerServicesConfiguration { taskExecutorResourceSpec, timerServiceShutdownTimeout, retryingRegistrationConfiguration, - ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration)); + ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration), + numIoThreads); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java index ec29f82..26949cf 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java @@ -20,13 +20,16 @@ package org.apache.flink.runtime.io.network; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.io.network.netty.NettyConfig; import org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionType; +import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration; import org.apache.flink.runtime.util.EnvironmentInformation; import java.time.Duration; +import java.util.concurrent.Executor; /** * Builder for the {@link NettyShuffleEnvironment}. @@ -59,6 +62,10 @@ public class NettyShuffleEnvironmentBuilder { private MetricGroup metricGroup = UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(); + private ResultPartitionManager resultPartitionManager = new ResultPartitionManager(); + + private Executor ioExecutor = Executors.directExecutor(); + public NettyShuffleEnvironmentBuilder setTaskManagerLocation(ResourceID taskManagerLocation) { this.taskManagerLocation = taskManagerLocation; return this; @@ -109,6 +116,16 @@ public class NettyShuffleEnvironmentBuilder { return this; } + public NettyShuffleEnvironmentBuilder setResultPartitionManager(ResultPartitionManager resultPartitionManager) { + this.resultPartitionManager = resultPartitionManager; + return this; + } + + public NettyShuffleEnvironmentBuilder setIoExecutor(Executor ioExecutor) { + this.ioExecutor = ioExecutor; + return this; + } + public NettyShuffleEnvironment build() { return NettyShuffleServiceFactory.createNettyShuffleEnvironment( new NettyShuffleEnvironmentConfiguration( @@ -128,6 +145,8 @@ public class NettyShuffleEnvironmentBuilder { compressionCodec), taskManagerLocation, new TaskEventDispatcher(), - metricGroup); + resultPartitionManager, + metricGroup, + ioExecutor); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java index b1a23a4..c0b094f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java @@ -20,10 +20,13 @@ package org.apache.flink.runtime.io.network; import org.apache.flink.configuration.NettyShuffleEnvironmentOptions; import org.apache.flink.core.memory.MemorySegmentProvider; +import org.apache.flink.core.testutils.BlockerSync; import org.apache.flink.runtime.io.disk.FileChannelManager; import org.apache.flink.runtime.io.disk.FileChannelManagerImpl; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.io.network.partition.ResultPartition; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; @@ -40,6 +43,8 @@ import org.junit.Test; import org.junit.rules.ExpectedException; import java.io.IOException; +import java.util.Collections; +import java.util.concurrent.Executors; import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createDummyConnectionManager; import static org.apache.flink.runtime.io.network.partition.PartitionTestUtils.createPartition; @@ -99,6 +104,27 @@ public class NettyShuffleEnvironmentTest extends TestLogger { testRegisterTaskWithLimitedBuffers(bufferCount); } + @Test + public void testSlowIODoesNotBlockRelease() throws Exception { + BlockerSync sync = new BlockerSync(); + ResultPartitionManager blockingResultPartitionManager = new ResultPartitionManager() { + @Override + public void releasePartition(ResultPartitionID partitionId, Throwable cause) { + sync.blockNonInterruptible(); + super.releasePartition(partitionId, cause); + } + }; + + NettyShuffleEnvironment shuffleEnvironment = new NettyShuffleEnvironmentBuilder() + .setResultPartitionManager(blockingResultPartitionManager) + .setIoExecutor(Executors.newFixedThreadPool(1)) + .build(); + + shuffleEnvironment.releasePartitionsLocally(Collections.singleton(new ResultPartitionID())); + sync.awaitBlocker(); + sync.releaseBlocker(); + } + private void testRegisterTaskWithLimitedBuffers(int bufferPoolSize) throws Exception { final NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder() .setNumNetworkBuffers(bufferPoolSize) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java index 49d6fb4..bb0079c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java @@ -226,6 +226,6 @@ public class TaskExecutorLocalStateStoresManagerTest extends TestLogger { return TaskManagerServices.fromConfiguration( config, UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), - Executors.directExecutor()); + Executors.newDirectExecutorService()); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java index 81b651f..66de0d8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java @@ -37,11 +37,15 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.io.network.NettyShuffleEnvironment; import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder; import org.apache.flink.runtime.io.network.partition.PartitionTestUtils; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionInfo; import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTracker; +import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTrackerImpl; import org.apache.flink.runtime.io.network.partition.TestingTaskExecutorPartitionTracker; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess; @@ -61,6 +65,7 @@ import org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils; import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions; import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.util.TestingFatalErrorHandler; +import org.apache.flink.testutils.executor.TestExecutorResource; import org.apache.flink.util.SerializedValue; import org.apache.flink.util.TestLogger; import org.apache.flink.util.function.TriConsumer; @@ -69,6 +74,7 @@ import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -81,6 +87,7 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.StreamSupport; @@ -107,6 +114,10 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger { @Rule public final TemporaryFolder tmp = new TemporaryFolder(); + @ClassRule + public static final TestExecutorResource TEST_EXECUTOR_SERVICE_RESOURCE = + new TestExecutorResource(() -> java.util.concurrent.Executors.newFixedThreadPool(1)); + @Before public void setup() { haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetriever); @@ -243,6 +254,48 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger { ); } + @Test + public void testBlockingLocalPartitionReleaseDoesNotBlockTaskExecutor() throws Exception { + BlockerSync sync = new BlockerSync(); + ResultPartitionManager blockingResultPartitionManager = new ResultPartitionManager() { + @Override + public void releasePartition(ResultPartitionID partitionId, Throwable cause) { + sync.blockNonInterruptible(); + super.releasePartition(partitionId, cause); + } + }; + + NettyShuffleEnvironment shuffleEnvironment = new NettyShuffleEnvironmentBuilder() + .setResultPartitionManager(blockingResultPartitionManager) + .setIoExecutor(TEST_EXECUTOR_SERVICE_RESOURCE.getExecutor()) + .build(); + + final CompletableFuture<ResultPartitionID> startTrackingFuture = new CompletableFuture<>(); + final TaskExecutorPartitionTracker partitionTracker = new TaskExecutorPartitionTrackerImpl(shuffleEnvironment) { + @Override + public void startTrackingPartition(JobID producingJobId, TaskExecutorPartitionInfo partitionInfo) { + super.startTrackingPartition(producingJobId, partitionInfo); + startTrackingFuture.complete(partitionInfo.getResultPartitionId()); + } + }; + + try { + internalTestPartitionRelease( + partitionTracker, + shuffleEnvironment, + startTrackingFuture, + (jobId, partitionId, taskExecutor, taskExecutorGateway) -> { + taskExecutorGateway.releaseOrPromotePartitions(jobId, Collections.singleton(partitionId), Collections.emptySet()); + + // execute some operation to check whether the TaskExecutor is blocked + taskExecutorGateway.canBeReleased().get(5, TimeUnit.SECONDS); + } + ); + } finally { + sync.releaseBlocker(); + } + } + private void testPartitionRelease(PartitionTrackerSetup partitionTrackerSetup, TestAction testAction) throws Exception { final TestingTaskExecutorPartitionTracker partitionTracker = new TestingTaskExecutorPartitionTracker(); final CompletableFuture<ResultPartitionID> startTrackingFuture = new CompletableFuture<>();
