This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 79cb1a140991d13b2010d10c86e054fcced977c4
Author: Chesnay Schepler <[email protected]>
AuthorDate: Tue May 19 16:58:22 2020 +0200

    [FLINK-17558][netty] Release partitions asynchronously
---
 .../io/network/NettyShuffleEnvironment.java        | 15 ++++--
 .../io/network/NettyShuffleServiceFactory.java     | 31 ++++++++++--
 .../runtime/shuffle/ShuffleEnvironmentContext.java | 11 ++++-
 .../runtime/taskexecutor/TaskManagerRunner.java    |  7 ++-
 .../runtime/taskexecutor/TaskManagerServices.java  | 19 ++++----
 .../TaskManagerServicesConfiguration.java          | 18 +++++--
 .../io/network/NettyShuffleEnvironmentBuilder.java | 21 ++++++++-
 .../io/network/NettyShuffleEnvironmentTest.java    | 26 ++++++++++
 .../TaskExecutorLocalStateStoresManagerTest.java   |  2 +-
 .../TaskExecutorPartitionLifecycleTest.java        | 55 ++++++++++++++++++++++
 10 files changed, 179 insertions(+), 26 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
index 8938f5e..61107ba 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
@@ -57,6 +57,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
 
 import static 
org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory.METRIC_GROUP_INPUT;
 import static 
org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory.METRIC_GROUP_OUTPUT;
@@ -95,6 +96,8 @@ public class NettyShuffleEnvironment implements 
ShuffleEnvironment<ResultPartiti
 
        private final SingleInputGateFactory singleInputGateFactory;
 
+       private final Executor ioExecutor;
+
        private boolean isClosed;
 
        NettyShuffleEnvironment(
@@ -105,7 +108,8 @@ public class NettyShuffleEnvironment implements 
ShuffleEnvironment<ResultPartiti
                        ResultPartitionManager resultPartitionManager,
                        FileChannelManager fileChannelManager,
                        ResultPartitionFactory resultPartitionFactory,
-                       SingleInputGateFactory singleInputGateFactory) {
+                       SingleInputGateFactory singleInputGateFactory,
+                       Executor ioExecutor) {
                this.taskExecutorResourceId = taskExecutorResourceId;
                this.config = config;
                this.networkBufferPool = networkBufferPool;
@@ -115,6 +119,7 @@ public class NettyShuffleEnvironment implements 
ShuffleEnvironment<ResultPartiti
                this.fileChannelManager = fileChannelManager;
                this.resultPartitionFactory = resultPartitionFactory;
                this.singleInputGateFactory = singleInputGateFactory;
+               this.ioExecutor = ioExecutor;
                this.isClosed = false;
        }
 
@@ -149,9 +154,11 @@ public class NettyShuffleEnvironment implements 
ShuffleEnvironment<ResultPartiti
 
        @Override
        public void releasePartitionsLocally(Collection<ResultPartitionID> 
partitionIds) {
-               for (ResultPartitionID partitionId : partitionIds) {
-                       resultPartitionManager.releasePartition(partitionId, 
null);
-               }
+               ioExecutor.execute(() -> {
+                       for (ResultPartitionID partitionId : partitionIds) {
+                               
resultPartitionManager.releasePartition(partitionId, null);
+                       }
+               });
        }
 
        /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
index 9c4f95b..87ce8cd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
@@ -38,6 +38,8 @@ import 
org.apache.flink.runtime.shuffle.ShuffleEnvironmentContext;
 import org.apache.flink.runtime.shuffle.ShuffleServiceFactory;
 import 
org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration;
 
+import java.util.concurrent.Executor;
+
 import static 
org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory.registerShuffleMetrics;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -65,7 +67,8 @@ public class NettyShuffleServiceFactory implements 
ShuffleServiceFactory<NettySh
                        networkConfig,
                        shuffleEnvironmentContext.getTaskExecutorResourceId(),
                        shuffleEnvironmentContext.getEventPublisher(),
-                       shuffleEnvironmentContext.getParentMetricGroup());
+                       shuffleEnvironmentContext.getParentMetricGroup(),
+                       shuffleEnvironmentContext.getIoExecutor());
        }
 
        @VisibleForTesting
@@ -73,16 +76,33 @@ public class NettyShuffleServiceFactory implements 
ShuffleServiceFactory<NettySh
                        NettyShuffleEnvironmentConfiguration config,
                        ResourceID taskExecutorResourceId,
                        TaskEventPublisher taskEventPublisher,
-                       MetricGroup metricGroup) {
+                       MetricGroup metricGroup,
+                       Executor ioExecutor) {
+               return createNettyShuffleEnvironment(
+                       config,
+                       taskExecutorResourceId,
+                       taskEventPublisher,
+                       new ResultPartitionManager(),
+                       metricGroup,
+                       ioExecutor);
+       }
+
+       @VisibleForTesting
+       static NettyShuffleEnvironment createNettyShuffleEnvironment(
+                       NettyShuffleEnvironmentConfiguration config,
+                       ResourceID taskExecutorResourceId,
+                       TaskEventPublisher taskEventPublisher,
+                       ResultPartitionManager resultPartitionManager,
+                       MetricGroup metricGroup,
+                       Executor ioExecutor) {
                checkNotNull(config);
                checkNotNull(taskExecutorResourceId);
                checkNotNull(taskEventPublisher);
+               checkNotNull(resultPartitionManager);
                checkNotNull(metricGroup);
 
                NettyConfig nettyConfig = config.nettyConfig();
 
-               ResultPartitionManager resultPartitionManager = new 
ResultPartitionManager();
-
                FileChannelManager fileChannelManager = new 
FileChannelManagerImpl(config.getTempDirs(), DIR_NAME_PREFIX);
 
                ConnectionManager connectionManager = nettyConfig != null ?
@@ -126,6 +146,7 @@ public class NettyShuffleServiceFactory implements 
ShuffleServiceFactory<NettySh
                        resultPartitionManager,
                        fileChannelManager,
                        resultPartitionFactory,
-                       singleInputGateFactory);
+                       singleInputGateFactory,
+                       ioExecutor);
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java
index 7863f18..3116372 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.io.network.TaskEventPublisher;
 
 import java.net.InetAddress;
+import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -40,6 +41,8 @@ public class ShuffleEnvironmentContext {
        private final TaskEventPublisher eventPublisher;
        private final MetricGroup parentMetricGroup;
 
+       private final Executor ioExecutor;
+
        public ShuffleEnvironmentContext(
                        Configuration configuration,
                        ResourceID taskExecutorResourceId,
@@ -47,7 +50,8 @@ public class ShuffleEnvironmentContext {
                        boolean localCommunicationOnly,
                        InetAddress hostAddress,
                        TaskEventPublisher eventPublisher,
-                       MetricGroup parentMetricGroup) {
+                       MetricGroup parentMetricGroup,
+                       Executor ioExecutor) {
                this.configuration = checkNotNull(configuration);
                this.taskExecutorResourceId = 
checkNotNull(taskExecutorResourceId);
                this.networkMemorySize = networkMemorySize;
@@ -55,6 +59,7 @@ public class ShuffleEnvironmentContext {
                this.hostAddress = checkNotNull(hostAddress);
                this.eventPublisher = checkNotNull(eventPublisher);
                this.parentMetricGroup = checkNotNull(parentMetricGroup);
+               this.ioExecutor = ioExecutor;
        }
 
        public Configuration getConfiguration() {
@@ -84,4 +89,8 @@ public class ShuffleEnvironmentContext {
        public MetricGroup getParentMetricGroup() {
                return parentMetricGroup;
        }
+
+       public Executor getIoExecutor() {
+               return ioExecutor;
+       }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 88786bf..9772700 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -30,6 +30,7 @@ import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobCacheService;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.entrypoint.FlinkParseException;
@@ -366,11 +367,15 @@ public class TaskManagerRunner implements 
FatalErrorHandler, AutoCloseableAsync
                        resourceID,
                        
taskManagerServicesConfiguration.getSystemResourceMetricsProbingInterval());
 
+               final ExecutorService ioExecutor = 
Executors.newCachedThreadPool(
+                       taskManagerServicesConfiguration.getNumIoThreads(),
+                       new ExecutorThreadFactory("flink-taskexecutor-io"));
+
                TaskManagerServices taskManagerServices = 
TaskManagerServices.fromConfiguration(
                        taskManagerServicesConfiguration,
                        blobCacheService.getPermanentBlobService(),
                        taskManagerMetricGroup.f1,
-                       rpcService.getExecutor()); // TODO replace this later 
with some dedicated executor for io.
+                       ioExecutor);
 
                TaskManagerConfiguration taskManagerConfiguration =
                        
TaskManagerConfiguration.fromConfiguration(configuration, 
taskExecutorResourceSpec, externalAddress);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index ee737c1..d6aa2c4 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -38,7 +38,6 @@ import 
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl;
 import org.apache.flink.runtime.taskexecutor.slot.TimerService;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation;
-import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
@@ -50,7 +49,6 @@ import java.io.File;
 import java.io.IOException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 
 /**
@@ -247,7 +245,7 @@ public class TaskManagerServices {
         * @param taskManagerServicesConfiguration task manager configuration
         * @param permanentBlobService permanentBlobService used by the services
         * @param taskManagerMetricGroup metric group of the task manager
-        * @param taskIOExecutor executor for async IO operations
+        * @param ioExecutor executor for async IO operations
         * @return task manager components
         * @throws Exception
         */
@@ -255,7 +253,7 @@ public class TaskManagerServices {
                        TaskManagerServicesConfiguration 
taskManagerServicesConfiguration,
                        PermanentBlobService permanentBlobService,
                        MetricGroup taskManagerMetricGroup,
-                       Executor taskIOExecutor) throws Exception {
+                       ExecutorService ioExecutor) throws Exception {
 
                // pre-start checks
                
checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths());
@@ -268,7 +266,8 @@ public class TaskManagerServices {
                final ShuffleEnvironment<?, ?> shuffleEnvironment = 
createShuffleEnvironment(
                        taskManagerServicesConfiguration,
                        taskEventDispatcher,
-                       taskManagerMetricGroup);
+                       taskManagerMetricGroup,
+                       ioExecutor);
                final int listeningDataPort = shuffleEnvironment.start();
 
                final KvStateService kvStateService = 
KvStateService.fromConfiguration(taskManagerServicesConfiguration);
@@ -306,9 +305,7 @@ public class TaskManagerServices {
                final TaskExecutorLocalStateStoresManager taskStateManager = 
new TaskExecutorLocalStateStoresManager(
                        
taskManagerServicesConfiguration.isLocalRecoveryEnabled(),
                        stateRootDirectoryFiles,
-                       taskIOExecutor);
-
-               final ExecutorService ioExecutor = 
Executors.newSingleThreadExecutor(new ExecutorThreadFactory("taskexecutor-io"));
+                       ioExecutor);
 
                final LibraryCacheManager libraryCacheManager = new 
BlobLibraryCacheManager(
                        permanentBlobService,
@@ -351,7 +348,8 @@ public class TaskManagerServices {
        private static ShuffleEnvironment<?, ?> createShuffleEnvironment(
                        TaskManagerServicesConfiguration 
taskManagerServicesConfiguration,
                        TaskEventDispatcher taskEventDispatcher,
-                       MetricGroup taskManagerMetricGroup) throws 
FlinkException {
+                       MetricGroup taskManagerMetricGroup,
+                       Executor ioExecutor) throws FlinkException {
 
                final ShuffleEnvironmentContext shuffleEnvironmentContext = new 
ShuffleEnvironmentContext(
                        taskManagerServicesConfiguration.getConfiguration(),
@@ -360,7 +358,8 @@ public class TaskManagerServices {
                        
taskManagerServicesConfiguration.isLocalCommunicationOnly(),
                        taskManagerServicesConfiguration.getBindAddress(),
                        taskEventDispatcher,
-                       taskManagerMetricGroup);
+                       taskManagerMetricGroup,
+                       ioExecutor);
 
                return ShuffleServiceLoader
                        
.loadShuffleServiceFactory(taskManagerServicesConfiguration.getConfiguration())
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index bb50b62..5d126ae 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import org.apache.flink.runtime.registration.RetryingRegistrationConfiguration;
+import org.apache.flink.runtime.util.ClusterEntrypointUtils;
 import org.apache.flink.runtime.util.ConfigurationParserUtils;
 import org.apache.flink.util.NetUtils;
 
@@ -84,7 +85,9 @@ public class TaskManagerServicesConfiguration {
 
        private final String[] alwaysParentFirstLoaderPatterns;
 
-       public TaskManagerServicesConfiguration(
+       private final int numIoThreads;
+
+       private TaskManagerServicesConfiguration(
                        Configuration configuration,
                        ResourceID resourceID,
                        String externalAddress,
@@ -102,7 +105,8 @@ public class TaskManagerServicesConfiguration {
                        RetryingRegistrationConfiguration 
retryingRegistrationConfiguration,
                        Optional<Time> systemResourceMetricsProbingInterval,
                        FlinkUserCodeClassLoaders.ResolveOrder 
classLoaderResolveOrder,
-                       String[] alwaysParentFirstLoaderPatterns) {
+                       String[] alwaysParentFirstLoaderPatterns,
+                       int numIoThreads) {
                this.configuration = checkNotNull(configuration);
                this.resourceID = checkNotNull(resourceID);
 
@@ -121,6 +125,7 @@ public class TaskManagerServicesConfiguration {
                this.taskExecutorResourceSpec = taskExecutorResourceSpec;
                this.classLoaderResolveOrder = classLoaderResolveOrder;
                this.alwaysParentFirstLoaderPatterns = 
alwaysParentFirstLoaderPatterns;
+               this.numIoThreads = numIoThreads;
 
                checkArgument(timerServiceShutdownTimeout >= 0L, "The timer " +
                        "service shutdown timeout must be greater or equal to 
0.");
@@ -215,6 +220,10 @@ public class TaskManagerServicesConfiguration {
                return alwaysParentFirstLoaderPatterns;
        }
 
+       public int getNumIoThreads() {
+               return numIoThreads;
+       }
+
        // 
--------------------------------------------------------------------------------------------
        //  Parsing of Flink configuration
        // 
--------------------------------------------------------------------------------------------
@@ -262,6 +271,8 @@ public class TaskManagerServicesConfiguration {
 
                final String[] alwaysParentFirstLoaderPatterns = 
CoreOptions.getParentFirstLoaderPatterns(configuration);
 
+               final int numIoThreads = 
ClusterEntrypointUtils.getPoolSize(configuration);
+
                return new TaskManagerServicesConfiguration(
                        configuration,
                        resourceID,
@@ -280,6 +291,7 @@ public class TaskManagerServicesConfiguration {
                        retryingRegistrationConfiguration,
                        
ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration),
                        
FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder),
-                       alwaysParentFirstLoaderPatterns);
+                       alwaysParentFirstLoaderPatterns,
+                       numIoThreads);
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
index 5c377d2..ab999fc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
@@ -20,13 +20,16 @@ package org.apache.flink.runtime.io.network;
 
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import 
org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionType;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import 
org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 
 import java.time.Duration;
+import java.util.concurrent.Executor;
 
 /**
  * Builder for the {@link NettyShuffleEnvironment}.
@@ -63,6 +66,10 @@ public class NettyShuffleEnvironmentBuilder {
 
        private MetricGroup metricGroup = 
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup();
 
+       private ResultPartitionManager resultPartitionManager = new 
ResultPartitionManager();
+
+       private Executor ioExecutor = Executors.directExecutor();
+
        public NettyShuffleEnvironmentBuilder setTaskManagerLocation(ResourceID 
taskManagerLocation) {
                this.taskManagerLocation = taskManagerLocation;
                return this;
@@ -123,6 +130,16 @@ public class NettyShuffleEnvironmentBuilder {
                return this;
        }
 
+       public NettyShuffleEnvironmentBuilder 
setResultPartitionManager(ResultPartitionManager resultPartitionManager) {
+               this.resultPartitionManager = resultPartitionManager;
+               return this;
+       }
+
+       public NettyShuffleEnvironmentBuilder setIoExecutor(Executor 
ioExecutor) {
+               this.ioExecutor = ioExecutor;
+               return this;
+       }
+
        public NettyShuffleEnvironment build() {
                return NettyShuffleServiceFactory.createNettyShuffleEnvironment(
                        new NettyShuffleEnvironmentConfiguration(
@@ -143,6 +160,8 @@ public class NettyShuffleEnvironmentBuilder {
                                maxBuffersPerChannel),
                        taskManagerLocation,
                        new TaskEventDispatcher(),
-                       metricGroup);
+                       resultPartitionManager,
+                       metricGroup,
+                       ioExecutor);
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
index 826f15c..62aba7e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
@@ -19,10 +19,13 @@
 package org.apache.flink.runtime.io.network;
 
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.core.testutils.BlockerSync;
 import org.apache.flink.runtime.io.disk.FileChannelManager;
 import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
 import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
@@ -41,6 +44,8 @@ import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
 import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.Executors;
 
 import static 
org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createDummyConnectionManager;
 import static 
org.apache.flink.runtime.io.network.partition.PartitionTestUtils.createPartition;
@@ -100,6 +105,27 @@ public class NettyShuffleEnvironmentTest extends 
TestLogger {
                testRegisterTaskWithLimitedBuffers(bufferCount);
        }
 
+       @Test
+       public void testSlowIODoesNotBlockRelease() throws Exception {
+               BlockerSync sync = new BlockerSync();
+               ResultPartitionManager blockingResultPartitionManager = new 
ResultPartitionManager() {
+                       @Override
+                       public void releasePartition(ResultPartitionID 
partitionId, Throwable cause) {
+                               sync.blockNonInterruptible();
+                               super.releasePartition(partitionId, cause);
+                       }
+               };
+
+               NettyShuffleEnvironment shuffleEnvironment = new 
NettyShuffleEnvironmentBuilder()
+                       
.setResultPartitionManager(blockingResultPartitionManager)
+                       .setIoExecutor(Executors.newFixedThreadPool(1))
+                       .build();
+
+               
shuffleEnvironment.releasePartitionsLocally(Collections.singleton(new 
ResultPartitionID()));
+               sync.awaitBlocker();
+               sync.releaseBlocker();
+       }
+
        private void testRegisterTaskWithLimitedBuffers(int bufferPoolSize) 
throws Exception {
                final NettyShuffleEnvironment network = new 
NettyShuffleEnvironmentBuilder()
                        .setNumNetworkBuffers(bufferPoolSize)
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
index 0560184..958b92c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
@@ -227,6 +227,6 @@ public class TaskExecutorLocalStateStoresManagerTest 
extends TestLogger {
                        config,
                        VoidPermanentBlobService.INSTANCE,
                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
-                       Executors.directExecutor());
+                       Executors.newDirectExecutorService());
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
index b538b36..9a3afa1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
@@ -38,11 +38,15 @@ import 
org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
 import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionInfo;
 import 
org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTracker;
+import 
org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTrackerImpl;
 import 
org.apache.flink.runtime.io.network.partition.TestingTaskExecutorPartitionTracker;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -63,6 +67,7 @@ import 
org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils;
 import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.testutils.executor.TestExecutorResource;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.function.TriConsumer;
@@ -71,6 +76,7 @@ import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -84,6 +90,7 @@ import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.StreamSupport;
 
@@ -110,6 +117,10 @@ public class TaskExecutorPartitionLifecycleTest extends 
TestLogger {
        @Rule
        public final TemporaryFolder tmp = new TemporaryFolder();
 
+       @ClassRule
+       public static final TestExecutorResource TEST_EXECUTOR_SERVICE_RESOURCE 
=
+               new TestExecutorResource(() -> 
java.util.concurrent.Executors.newFixedThreadPool(1));
+
        @Before
        public void setup() {
                
haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetriever);
@@ -268,6 +279,50 @@ public class TaskExecutorPartitionLifecycleTest extends 
TestLogger {
                );
        }
 
+       @Test
+       public void testBlockingLocalPartitionReleaseDoesNotBlockTaskExecutor() 
throws Exception {
+               BlockerSync sync = new BlockerSync();
+               ResultPartitionManager blockingResultPartitionManager = new 
ResultPartitionManager() {
+                       @Override
+                       public void releasePartition(ResultPartitionID 
partitionId, Throwable cause) {
+                               sync.blockNonInterruptible();
+                               super.releasePartition(partitionId, cause);
+                       }
+               };
+
+               NettyShuffleEnvironment shuffleEnvironment = new 
NettyShuffleEnvironmentBuilder()
+                       
.setResultPartitionManager(blockingResultPartitionManager)
+                       
.setIoExecutor(TEST_EXECUTOR_SERVICE_RESOURCE.getExecutor())
+                       .build();
+
+               final CompletableFuture<ResultPartitionID> startTrackingFuture 
= new CompletableFuture<>();
+               final TaskExecutorPartitionTracker partitionTracker = new 
TaskExecutorPartitionTrackerImpl(shuffleEnvironment) {
+                       @Override
+                       public void startTrackingPartition(JobID 
producingJobId, TaskExecutorPartitionInfo partitionInfo) {
+                               super.startTrackingPartition(producingJobId, 
partitionInfo);
+                               
startTrackingFuture.complete(partitionInfo.getResultPartitionId());
+                       }
+               };
+
+               try {
+                       internalTestPartitionRelease(
+                               partitionTracker,
+                               shuffleEnvironment,
+                               startTrackingFuture,
+                               (jobId, resultPartitionDeploymentDescriptor, 
taskExecutor, taskExecutorGateway) -> {
+                                       final IntermediateDataSetID dataSetId = 
resultPartitionDeploymentDescriptor.getResultId();
+
+                                       
taskExecutorGateway.releaseClusterPartitions(Collections.singleton(dataSetId), 
timeout);
+
+                                       // execute some operation to check 
whether the TaskExecutor is blocked
+                                       
taskExecutorGateway.canBeReleased().get(5, TimeUnit.SECONDS);
+                               }
+                       );
+               } finally {
+                       sync.releaseBlocker();
+               }
+       }
+
        private void testPartitionRelease(PartitionTrackerSetup 
partitionTrackerSetup, TestAction testAction) throws Exception {
                final TestingTaskExecutorPartitionTracker partitionTracker = 
new TestingTaskExecutorPartitionTracker();
                final CompletableFuture<ResultPartitionID> startTrackingFuture 
= new CompletableFuture<>();

Reply via email to