This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8e8dbb8dbd51fa896ed7258cedb955931bc0e03d Author: Chesnay Schepler <[email protected]> AuthorDate: Wed May 27 12:59:28 2020 +0200 [FLINK-17558][runtime] Add Executors#newCachedThreadPool --- .../apache/flink/runtime/concurrent/Executors.java | 27 ++++++++++ .../flink/runtime/concurrent/ExecutorsTest.java | 63 ++++++++++++++++++++++ 2 files changed, 90 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java index 41d9a32..c758752 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java @@ -18,8 +18,14 @@ package org.apache.flink.runtime.concurrent; +import org.apache.flink.runtime.util.ExecutorThreadFactory; + import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import scala.concurrent.ExecutionContext; @@ -61,6 +67,27 @@ public class Executors { } /** + * Returns a new cached thread pool with the desired maximum size. + * + * <p>This method is a variation of {@link java.util.concurrent.Executors#newFixedThreadPool(int, ThreadFactory)}, + * with the minimum pool size set to 0. + * In that respect it is similar to {@link java.util.concurrent.Executors#newCachedThreadPool()}, but it uses a + * {@link LinkedBlockingQueue} instead to allow tasks to be queued, instead of failing with an exception if the pool + * is saturated. + * + * @see ExecutorThreadFactory + * @param maxPoolSize maximum size of the thread pool + * @param threadFactory thread factory to use + * @return new cached thread pool + */ + public static ExecutorService newCachedThreadPool(int maxPoolSize, ThreadFactory threadFactory) { + return new ThreadPoolExecutor(0, maxPoolSize, + 60L, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + threadFactory); + } + + /** * Direct execution context. */ private static class DirectExecutionContext implements ExecutionContext { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ExecutorsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ExecutorsTest.java new file mode 100644 index 0000000..e3be776 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ExecutorsTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.concurrent; + +import org.apache.flink.core.testutils.BlockerSync; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.testutils.executor.TestExecutorResource; + +import org.junit.Rule; +import org.junit.Test; + +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadFactory; + +/** + * Tests for {@link Executors}. + */ +public class ExecutorsTest { + + @Rule + public final TestExecutorResource executorResource = new TestExecutorResource( + () -> Executors.newCachedThreadPool(1, new ExecutorThreadFactory())); + + /** + * Tests that the {@link ExecutorService} returned by {@link Executors#newCachedThreadPool(int, ThreadFactory)} + * allows tasks to be queued. In a prior implementation the executor used a synchronous queue, rejecting tasks with + * an exception if no thread was available to process it. + */ + @Test + public void testNewCachedThreadPoolDoesNotRejectTasksExceedingActiveThreadCount() throws InterruptedException { + Executor executor = executorResource.getExecutor(); + + BlockerSync sync = new BlockerSync(); + try { + // submit the first blocking task, which should block the single pool thread + executor.execute(sync::blockNonInterruptible); + + // the thread is now blocked + sync.awaitBlocker(); + + // this task should not be rejected + executor.execute(sync::blockNonInterruptible); + } finally { + sync.releaseBlocker(); + } + } +}
