tillrohrmann commented on a change in pull request #12264:
URL: https://github.com/apache/flink/pull/12264#discussion_r428047035



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
##########
@@ -280,7 +273,65 @@ public void testClusterPartitionRelease() throws Exception 
{
                );
        }
 
-       private <C> void testPartitionRelease(PartitionTrackerSetup<C> 
partitionTrackerSetup, TestAction<C> testAction) throws Exception {
+       @Test
+       public void testBlockingLocalPartitionReleaseDoesNotBlockTaskExecutor() 
throws Exception {
+               BlockerSync sync = new BlockerSync();
+               ResultPartitionManager blockingResultPartitionManager = new 
ResultPartitionManager() {
+                       @Override
+                       public void releasePartition(ResultPartitionID 
partitionId, Throwable cause) {
+                               sync.blockNonInterruptible();
+                               super.releasePartition(partitionId, cause);
+                       }
+               };
+
+               NettyShuffleEnvironment shuffleEnvironment = new 
NettyShuffleEnvironmentBuilder()
+                       
.setResultPartitionManager(blockingResultPartitionManager)
+                       
.setIoExecutor(java.util.concurrent.Executors.newFixedThreadPool(1))

Review comment:
       I would suggest to also shut this executor service down at the end of 
the test. It might be necessary to unblock the release operation for this.

##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
##########
@@ -490,6 +490,13 @@
                                + " size will be used. The exact size of JVM 
Overhead can be explicitly specified by setting the min/max"
                                + " size to the same value.");
 
+       @Documentation.ExcludeFromDocumentation("This option just serves as a 
last-ditch escape hatch.")
+       public static final ConfigOption<Integer> NUM_IO_THREADS =
+               key("taskmanager.io.threads.num")
+                       .intType()
+                       .defaultValue(2)
+                       .withDescription("The number of threads to use for 
non-critical IO operations.");

Review comment:
       We might be able to unify this configuration option with 
`ClusterOptions.CLUSTER_IO_EXECUTOR_POOL_SIZE`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
##########
@@ -265,10 +265,15 @@ public static TaskManagerServices fromConfiguration(
                // start the I/O manager, it will create some temp directories.
                final IOManager ioManager = new 
IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());
 
+               final ExecutorService ioExecutor = Executors.newFixedThreadPool(

Review comment:
       Can the `ioExecutor` also replace the `taskIOExecutor`?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
##########
@@ -100,6 +105,27 @@ public void testRegisterTaskWithInsufficientBuffers() 
throws Exception {
                testRegisterTaskWithLimitedBuffers(bufferCount);
        }
 
+       @Test
+       public void testSlowIODoesNotBlockRelease() throws Exception {
+               BlockerSync sync = new BlockerSync();

Review comment:
       I guess a `OneShotLatch` would also work here if the test threads call 
the trigger on it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to