This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8e8dbb8dbd51fa896ed7258cedb955931bc0e03d
Author: Chesnay Schepler <[email protected]>
AuthorDate: Wed May 27 12:59:28 2020 +0200

    [FLINK-17558][runtime] Add Executors#newCachedThreadPool
---
 .../apache/flink/runtime/concurrent/Executors.java | 27 ++++++++++
 .../flink/runtime/concurrent/ExecutorsTest.java    | 63 ++++++++++++++++++++++
 2 files changed, 90 insertions(+)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
index 41d9a32..c758752 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
@@ -18,8 +18,14 @@
 
 package org.apache.flink.runtime.concurrent;
 
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import scala.concurrent.ExecutionContext;
 
@@ -61,6 +67,27 @@ public class Executors {
        }
 
        /**
+        * Returns a new cached thread pool with the desired maximum size.
+        *
+        * <p>This method is a variation of {@link 
java.util.concurrent.Executors#newFixedThreadPool(int, ThreadFactory)},
+        * with the minimum pool size set to 0.
+        * In that respect it is similar to {@link 
java.util.concurrent.Executors#newCachedThreadPool()}, but it uses a
+        * {@link LinkedBlockingQueue} instead to allow tasks to be queued, 
instead of failing with an exception if the pool
+        * is saturated.
+        *
+        * @see ExecutorThreadFactory
+        * @param maxPoolSize maximum size of the thread pool
+        * @param threadFactory thread factory to use
+        * @return new cached thread pool
+        */
+       public static ExecutorService newCachedThreadPool(int maxPoolSize, 
ThreadFactory threadFactory) {
+               return new ThreadPoolExecutor(0, maxPoolSize,
+                       60L, TimeUnit.SECONDS,
+                       new LinkedBlockingQueue<>(),
+                       threadFactory);
+       }
+
+       /**
         * Direct execution context.
         */
        private static class DirectExecutionContext implements ExecutionContext 
{
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ExecutorsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ExecutorsTest.java
new file mode 100644
index 0000000..e3be776
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ExecutorsTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+import org.apache.flink.core.testutils.BlockerSync;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.testutils.executor.TestExecutorResource;
+
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * Tests for {@link Executors}.
+ */
+public class ExecutorsTest {
+
+       @Rule
+       public final TestExecutorResource executorResource = new 
TestExecutorResource(
+               () -> Executors.newCachedThreadPool(1, new 
ExecutorThreadFactory()));
+
+       /**
+        * Tests that the {@link ExecutorService} returned by {@link 
Executors#newCachedThreadPool(int, ThreadFactory)}
+        * allows tasks to be queued. In a prior implementation the executor 
used a synchronous queue, rejecting tasks with
+        * an exception if no thread was available to process it.
+        */
+       @Test
+       public void 
testNewCachedThreadPoolDoesNotRejectTasksExceedingActiveThreadCount() throws 
InterruptedException {
+               Executor executor = executorResource.getExecutor();
+
+               BlockerSync sync = new BlockerSync();
+               try {
+                       // submit the first blocking task, which should block 
the single pool thread
+                       executor.execute(sync::blockNonInterruptible);
+
+                       // the thread is now blocked
+                       sync.awaitBlocker();
+
+                       // this task should not be rejected
+                       executor.execute(sync::blockNonInterruptible);
+               } finally {
+                       sync.releaseBlocker();
+               }
+       }
+}

Reply via email to