This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c6c0839d40571bb9ad47b8dbbec0b0ea14615372
Author: Chesnay Schepler <[email protected]>
AuthorDate: Tue May 19 16:58:22 2020 +0200

    [FLINK-17558][netty] Release partitions asynchronously
---
 .../io/network/NettyShuffleEnvironment.java        | 15 ++++--
 .../io/network/NettyShuffleServiceFactory.java     | 31 +++++++++++--
 .../runtime/shuffle/ShuffleEnvironmentContext.java | 11 ++++-
 .../runtime/taskexecutor/TaskManagerRunner.java    |  7 ++-
 .../runtime/taskexecutor/TaskManagerServices.java  | 16 ++++---
 .../TaskManagerServicesConfiguration.java          | 16 ++++++-
 .../io/network/NettyShuffleEnvironmentBuilder.java | 21 ++++++++-
 .../io/network/NettyShuffleEnvironmentTest.java    | 26 +++++++++++
 .../TaskExecutorLocalStateStoresManagerTest.java   |  2 +-
 .../TaskExecutorPartitionLifecycleTest.java        | 53 ++++++++++++++++++++++
 10 files changed, 177 insertions(+), 21 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
index 2d2d6f3..5e7c119 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
@@ -56,6 +56,7 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
 
 import static 
org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory.METRIC_GROUP_INPUT;
 import static 
org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory.METRIC_GROUP_OUTPUT;
@@ -94,6 +95,8 @@ public class NettyShuffleEnvironment implements 
ShuffleEnvironment<ResultPartiti
 
        private final SingleInputGateFactory singleInputGateFactory;
 
+       private final Executor ioExecutor;
+
        private boolean isClosed;
 
        NettyShuffleEnvironment(
@@ -104,7 +107,8 @@ public class NettyShuffleEnvironment implements 
ShuffleEnvironment<ResultPartiti
                        ResultPartitionManager resultPartitionManager,
                        FileChannelManager fileChannelManager,
                        ResultPartitionFactory resultPartitionFactory,
-                       SingleInputGateFactory singleInputGateFactory) {
+                       SingleInputGateFactory singleInputGateFactory,
+                       Executor ioExecutor) {
                this.taskExecutorResourceId = taskExecutorResourceId;
                this.config = config;
                this.networkBufferPool = networkBufferPool;
@@ -114,6 +118,7 @@ public class NettyShuffleEnvironment implements 
ShuffleEnvironment<ResultPartiti
                this.fileChannelManager = fileChannelManager;
                this.resultPartitionFactory = resultPartitionFactory;
                this.singleInputGateFactory = singleInputGateFactory;
+               this.ioExecutor = ioExecutor;
                this.isClosed = false;
        }
 
@@ -148,9 +153,11 @@ public class NettyShuffleEnvironment implements 
ShuffleEnvironment<ResultPartiti
 
        @Override
        public void releasePartitionsLocally(Collection<ResultPartitionID> 
partitionIds) {
-               for (ResultPartitionID partitionId : partitionIds) {
-                       resultPartitionManager.releasePartition(partitionId, 
null);
-               }
+               ioExecutor.execute(() -> {
+                       for (ResultPartitionID partitionId : partitionIds) {
+                               
resultPartitionManager.releasePartition(partitionId, null);
+                       }
+               });
        }
 
        /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
index afbaba2..f065122 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
@@ -38,6 +38,8 @@ import 
org.apache.flink.runtime.shuffle.ShuffleEnvironmentContext;
 import org.apache.flink.runtime.shuffle.ShuffleServiceFactory;
 import 
org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration;
 
+import java.util.concurrent.Executor;
+
 import static 
org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory.registerShuffleMetrics;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -65,7 +67,8 @@ public class NettyShuffleServiceFactory implements 
ShuffleServiceFactory<NettySh
                        networkConfig,
                        shuffleEnvironmentContext.getTaskExecutorResourceId(),
                        shuffleEnvironmentContext.getEventPublisher(),
-                       shuffleEnvironmentContext.getParentMetricGroup());
+                       shuffleEnvironmentContext.getParentMetricGroup(),
+                       shuffleEnvironmentContext.getIoExecutor());
        }
 
        @VisibleForTesting
@@ -73,16 +76,33 @@ public class NettyShuffleServiceFactory implements 
ShuffleServiceFactory<NettySh
                        NettyShuffleEnvironmentConfiguration config,
                        ResourceID taskExecutorResourceId,
                        TaskEventPublisher taskEventPublisher,
-                       MetricGroup metricGroup) {
+                       MetricGroup metricGroup,
+                       Executor ioExecutor) {
+               return createNettyShuffleEnvironment(
+                       config,
+                       taskExecutorResourceId,
+                       taskEventPublisher,
+                       new ResultPartitionManager(),
+                       metricGroup,
+                       ioExecutor);
+       }
+
+       @VisibleForTesting
+       static NettyShuffleEnvironment createNettyShuffleEnvironment(
+                       NettyShuffleEnvironmentConfiguration config,
+                       ResourceID taskExecutorResourceId,
+                       TaskEventPublisher taskEventPublisher,
+                       ResultPartitionManager resultPartitionManager,
+                       MetricGroup metricGroup,
+                       Executor ioExecutor) {
                checkNotNull(config);
                checkNotNull(taskExecutorResourceId);
                checkNotNull(taskEventPublisher);
+               checkNotNull(resultPartitionManager);
                checkNotNull(metricGroup);
 
                NettyConfig nettyConfig = config.nettyConfig();
 
-               ResultPartitionManager resultPartitionManager = new 
ResultPartitionManager();
-
                FileChannelManager fileChannelManager = new 
FileChannelManagerImpl(config.getTempDirs(), DIR_NAME_PREFIX);
 
                ConnectionManager connectionManager = nettyConfig != null ?
@@ -125,6 +145,7 @@ public class NettyShuffleServiceFactory implements 
ShuffleServiceFactory<NettySh
                        resultPartitionManager,
                        fileChannelManager,
                        resultPartitionFactory,
-                       singleInputGateFactory);
+                       singleInputGateFactory,
+                       ioExecutor);
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java
index 7863f18..3116372 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.io.network.TaskEventPublisher;
 
 import java.net.InetAddress;
+import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -40,6 +41,8 @@ public class ShuffleEnvironmentContext {
        private final TaskEventPublisher eventPublisher;
        private final MetricGroup parentMetricGroup;
 
+       private final Executor ioExecutor;
+
        public ShuffleEnvironmentContext(
                        Configuration configuration,
                        ResourceID taskExecutorResourceId,
@@ -47,7 +50,8 @@ public class ShuffleEnvironmentContext {
                        boolean localCommunicationOnly,
                        InetAddress hostAddress,
                        TaskEventPublisher eventPublisher,
-                       MetricGroup parentMetricGroup) {
+                       MetricGroup parentMetricGroup,
+                       Executor ioExecutor) {
                this.configuration = checkNotNull(configuration);
                this.taskExecutorResourceId = 
checkNotNull(taskExecutorResourceId);
                this.networkMemorySize = networkMemorySize;
@@ -55,6 +59,7 @@ public class ShuffleEnvironmentContext {
                this.hostAddress = checkNotNull(hostAddress);
                this.eventPublisher = checkNotNull(eventPublisher);
                this.parentMetricGroup = checkNotNull(parentMetricGroup);
+               this.ioExecutor = ioExecutor;
        }
 
        public Configuration getConfiguration() {
@@ -84,4 +89,8 @@ public class ShuffleEnvironmentContext {
        public MetricGroup getParentMetricGroup() {
                return parentMetricGroup;
        }
+
+       public Executor getIoExecutor() {
+               return ioExecutor;
+       }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 019d186..7f66d03 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -31,6 +31,7 @@ import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobCacheService;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.entrypoint.ClusterConfiguration;
@@ -373,10 +374,14 @@ public class TaskManagerRunner implements 
FatalErrorHandler, AutoCloseableAsync
                        resourceID,
                        
taskManagerServicesConfiguration.getSystemResourceMetricsProbingInterval());
 
+               final ExecutorService ioExecutor = 
Executors.newCachedThreadPool(
+                       taskManagerServicesConfiguration.getNumIoThreads(),
+                       new ExecutorThreadFactory("flink-taskexecutor-io"));
+
                TaskManagerServices taskManagerServices = 
TaskManagerServices.fromConfiguration(
                        taskManagerServicesConfiguration,
                        taskManagerMetricGroup.f1,
-                       rpcService.getExecutor()); // TODO replace this later 
with some dedicated executor for io.
+                       ioExecutor);
 
                TaskManagerConfiguration taskManagerConfiguration =
                        
TaskManagerConfiguration.fromConfiguration(configuration, 
taskExecutorResourceSpec);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index 04c3b29..a443fb4 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -45,6 +45,7 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 
 /**
@@ -208,14 +209,14 @@ public class TaskManagerServices {
         *
         * @param taskManagerServicesConfiguration task manager configuration
         * @param taskManagerMetricGroup metric group of the task manager
-        * @param taskIOExecutor executor for async IO operations
+        * @param ioExecutor executor for async IO operations
         * @return task manager components
         * @throws Exception
         */
        public static TaskManagerServices fromConfiguration(
                        TaskManagerServicesConfiguration 
taskManagerServicesConfiguration,
                        MetricGroup taskManagerMetricGroup,
-                       Executor taskIOExecutor) throws Exception {
+                       ExecutorService ioExecutor) throws Exception {
 
                // pre-start checks
                
checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths());
@@ -228,7 +229,8 @@ public class TaskManagerServices {
                final ShuffleEnvironment<?, ?> shuffleEnvironment = 
createShuffleEnvironment(
                        taskManagerServicesConfiguration,
                        taskEventDispatcher,
-                       taskManagerMetricGroup);
+                       taskManagerMetricGroup,
+                       ioExecutor);
                final int dataPort = shuffleEnvironment.start();
 
                final KvStateService kvStateService = 
KvStateService.fromConfiguration(taskManagerServicesConfiguration);
@@ -262,7 +264,7 @@ public class TaskManagerServices {
                final TaskExecutorLocalStateStoresManager taskStateManager = 
new TaskExecutorLocalStateStoresManager(
                        
taskManagerServicesConfiguration.isLocalRecoveryEnabled(),
                        stateRootDirectoryFiles,
-                       taskIOExecutor);
+                       ioExecutor);
 
                return new TaskManagerServices(
                        taskManagerLocation,
@@ -297,7 +299,8 @@ public class TaskManagerServices {
        private static ShuffleEnvironment<?, ?> createShuffleEnvironment(
                        TaskManagerServicesConfiguration 
taskManagerServicesConfiguration,
                        TaskEventDispatcher taskEventDispatcher,
-                       MetricGroup taskManagerMetricGroup) throws 
FlinkException {
+                       MetricGroup taskManagerMetricGroup,
+                       Executor ioExecutor) throws FlinkException {
 
                final ShuffleEnvironmentContext shuffleEnvironmentContext = new 
ShuffleEnvironmentContext(
                        taskManagerServicesConfiguration.getConfiguration(),
@@ -306,7 +309,8 @@ public class TaskManagerServices {
                        
taskManagerServicesConfiguration.isLocalCommunicationOnly(),
                        
taskManagerServicesConfiguration.getTaskManagerAddress(),
                        taskEventDispatcher,
-                       taskManagerMetricGroup);
+                       taskManagerMetricGroup,
+                       ioExecutor);
 
                return ShuffleServiceLoader
                        
.loadShuffleServiceFactory(taskManagerServicesConfiguration.getConfiguration())
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index 5ec28e0..480394c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.registration.RetryingRegistrationConfiguration;
+import org.apache.flink.runtime.util.ClusterEntrypointUtils;
 import org.apache.flink.runtime.util.ConfigurationParserUtils;
 
 import javax.annotation.Nullable;
@@ -71,6 +72,8 @@ public class TaskManagerServicesConfiguration {
 
        private final TaskExecutorResourceSpec taskExecutorResourceSpec;
 
+       private final int numIoThreads;
+
        public TaskManagerServicesConfiguration(
                        Configuration configuration,
                        ResourceID resourceID,
@@ -85,7 +88,8 @@ public class TaskManagerServicesConfiguration {
                        TaskExecutorResourceSpec taskExecutorResourceSpec,
                        long timerServiceShutdownTimeout,
                        RetryingRegistrationConfiguration 
retryingRegistrationConfiguration,
-                       Optional<Time> systemResourceMetricsProbingInterval) {
+                       Optional<Time> systemResourceMetricsProbingInterval,
+                       int numIoThreads) {
                this.configuration = checkNotNull(configuration);
                this.resourceID = checkNotNull(resourceID);
 
@@ -100,6 +104,7 @@ public class TaskManagerServicesConfiguration {
                this.pageSize = pageSize;
 
                this.taskExecutorResourceSpec = taskExecutorResourceSpec;
+               this.numIoThreads = numIoThreads;
 
                checkArgument(timerServiceShutdownTimeout >= 0L, "The timer " +
                        "service shutdown timeout must be greater or equal to 
0.");
@@ -178,6 +183,10 @@ public class TaskManagerServicesConfiguration {
                return retryingRegistrationConfiguration;
        }
 
+       public int getNumIoThreads() {
+               return numIoThreads;
+       }
+
        // 
--------------------------------------------------------------------------------------------
        //  Parsing of Flink configuration
        // 
--------------------------------------------------------------------------------------------
@@ -215,6 +224,8 @@ public class TaskManagerServicesConfiguration {
 
                final RetryingRegistrationConfiguration 
retryingRegistrationConfiguration = 
RetryingRegistrationConfiguration.fromConfiguration(configuration);
 
+               final int numIoThreads = 
ClusterEntrypointUtils.getPoolSize(configuration);
+
                return new TaskManagerServicesConfiguration(
                        configuration,
                        resourceID,
@@ -229,6 +240,7 @@ public class TaskManagerServicesConfiguration {
                        taskExecutorResourceSpec,
                        timerServiceShutdownTimeout,
                        retryingRegistrationConfiguration,
-                       
ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));
+                       
ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration),
+                       numIoThreads);
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
index ec29f82..26949cf 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
@@ -20,13 +20,16 @@ package org.apache.flink.runtime.io.network;
 
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import 
org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionType;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import 
org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 
 import java.time.Duration;
+import java.util.concurrent.Executor;
 
 /**
  * Builder for the {@link NettyShuffleEnvironment}.
@@ -59,6 +62,10 @@ public class NettyShuffleEnvironmentBuilder {
 
        private MetricGroup metricGroup = 
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup();
 
+       private ResultPartitionManager resultPartitionManager = new 
ResultPartitionManager();
+
+       private Executor ioExecutor = Executors.directExecutor();
+
        public NettyShuffleEnvironmentBuilder setTaskManagerLocation(ResourceID 
taskManagerLocation) {
                this.taskManagerLocation = taskManagerLocation;
                return this;
@@ -109,6 +116,16 @@ public class NettyShuffleEnvironmentBuilder {
                return this;
        }
 
+       public NettyShuffleEnvironmentBuilder 
setResultPartitionManager(ResultPartitionManager resultPartitionManager) {
+               this.resultPartitionManager = resultPartitionManager;
+               return this;
+       }
+
+       public NettyShuffleEnvironmentBuilder setIoExecutor(Executor 
ioExecutor) {
+               this.ioExecutor = ioExecutor;
+               return this;
+       }
+
        public NettyShuffleEnvironment build() {
                return NettyShuffleServiceFactory.createNettyShuffleEnvironment(
                        new NettyShuffleEnvironmentConfiguration(
@@ -128,6 +145,8 @@ public class NettyShuffleEnvironmentBuilder {
                                compressionCodec),
                        taskManagerLocation,
                        new TaskEventDispatcher(),
-                       metricGroup);
+                       resultPartitionManager,
+                       metricGroup,
+                       ioExecutor);
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
index b1a23a4..c0b094f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
@@ -20,10 +20,13 @@ package org.apache.flink.runtime.io.network;
 
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.core.memory.MemorySegmentProvider;
+import org.apache.flink.core.testutils.BlockerSync;
 import org.apache.flink.runtime.io.disk.FileChannelManager;
 import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
@@ -40,6 +43,8 @@ import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
 import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.Executors;
 
 import static 
org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createDummyConnectionManager;
 import static 
org.apache.flink.runtime.io.network.partition.PartitionTestUtils.createPartition;
@@ -99,6 +104,27 @@ public class NettyShuffleEnvironmentTest extends TestLogger 
{
                testRegisterTaskWithLimitedBuffers(bufferCount);
        }
 
+       @Test
+       public void testSlowIODoesNotBlockRelease() throws Exception {
+               BlockerSync sync = new BlockerSync();
+               ResultPartitionManager blockingResultPartitionManager = new 
ResultPartitionManager() {
+                       @Override
+                       public void releasePartition(ResultPartitionID 
partitionId, Throwable cause) {
+                               sync.blockNonInterruptible();
+                               super.releasePartition(partitionId, cause);
+                       }
+               };
+
+               NettyShuffleEnvironment shuffleEnvironment = new 
NettyShuffleEnvironmentBuilder()
+                       
.setResultPartitionManager(blockingResultPartitionManager)
+                       .setIoExecutor(Executors.newFixedThreadPool(1))
+                       .build();
+
+               
shuffleEnvironment.releasePartitionsLocally(Collections.singleton(new 
ResultPartitionID()));
+               sync.awaitBlocker();
+               sync.releaseBlocker();
+       }
+
        private void testRegisterTaskWithLimitedBuffers(int bufferPoolSize) 
throws Exception {
                final NettyShuffleEnvironment network = new 
NettyShuffleEnvironmentBuilder()
                        .setNumNetworkBuffers(bufferPoolSize)
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
index 49d6fb4..bb0079c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
@@ -226,6 +226,6 @@ public class TaskExecutorLocalStateStoresManagerTest 
extends TestLogger {
                return TaskManagerServices.fromConfiguration(
                        config,
                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-                       Executors.directExecutor());
+                       Executors.newDirectExecutorService());
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
index 81b651f..66de0d8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
@@ -37,11 +37,15 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
 import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionInfo;
 import 
org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTracker;
+import 
org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTrackerImpl;
 import 
org.apache.flink.runtime.io.network.partition.TestingTaskExecutorPartitionTracker;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
@@ -61,6 +65,7 @@ import 
org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils;
 import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.testutils.executor.TestExecutorResource;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.function.TriConsumer;
@@ -69,6 +74,7 @@ import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -81,6 +87,7 @@ import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.StreamSupport;
 
@@ -107,6 +114,10 @@ public class TaskExecutorPartitionLifecycleTest extends 
TestLogger {
        @Rule
        public final TemporaryFolder tmp = new TemporaryFolder();
 
+       @ClassRule
+       public static final TestExecutorResource TEST_EXECUTOR_SERVICE_RESOURCE 
=
+               new TestExecutorResource(() -> 
java.util.concurrent.Executors.newFixedThreadPool(1));
+
        @Before
        public void setup() {
                
haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetriever);
@@ -243,6 +254,48 @@ public class TaskExecutorPartitionLifecycleTest extends 
TestLogger {
                );
        }
 
+       @Test
+       public void testBlockingLocalPartitionReleaseDoesNotBlockTaskExecutor() 
throws Exception {
+               BlockerSync sync = new BlockerSync();
+               ResultPartitionManager blockingResultPartitionManager = new 
ResultPartitionManager() {
+                       @Override
+                       public void releasePartition(ResultPartitionID 
partitionId, Throwable cause) {
+                               sync.blockNonInterruptible();
+                               super.releasePartition(partitionId, cause);
+                       }
+               };
+
+               NettyShuffleEnvironment shuffleEnvironment = new 
NettyShuffleEnvironmentBuilder()
+                       
.setResultPartitionManager(blockingResultPartitionManager)
+                       
.setIoExecutor(TEST_EXECUTOR_SERVICE_RESOURCE.getExecutor())
+                       .build();
+
+               final CompletableFuture<ResultPartitionID> startTrackingFuture 
= new CompletableFuture<>();
+               final TaskExecutorPartitionTracker partitionTracker = new 
TaskExecutorPartitionTrackerImpl(shuffleEnvironment) {
+                       @Override
+                       public void startTrackingPartition(JobID 
producingJobId, TaskExecutorPartitionInfo partitionInfo) {
+                               super.startTrackingPartition(producingJobId, 
partitionInfo);
+                               
startTrackingFuture.complete(partitionInfo.getResultPartitionId());
+                       }
+               };
+
+               try {
+                       internalTestPartitionRelease(
+                               partitionTracker,
+                               shuffleEnvironment,
+                               startTrackingFuture,
+                               (jobId, partitionId, taskExecutor, 
taskExecutorGateway) -> {
+                                       
taskExecutorGateway.releaseOrPromotePartitions(jobId, 
Collections.singleton(partitionId), Collections.emptySet());
+
+                                       // execute some operation to check 
whether the TaskExecutor is blocked
+                                       
taskExecutorGateway.canBeReleased().get(5, TimeUnit.SECONDS);
+                               }
+                       );
+               } finally {
+                       sync.releaseBlocker();
+               }
+       }
+
        private void testPartitionRelease(PartitionTrackerSetup 
partitionTrackerSetup, TestAction testAction) throws Exception {
                final TestingTaskExecutorPartitionTracker partitionTracker = 
new TestingTaskExecutorPartitionTracker();
                final CompletableFuture<ResultPartitionID> startTrackingFuture 
= new CompletableFuture<>();

Reply via email to