HADOOP-15039. Move SemaphoredDelegatingExecutor to hadoop-common. Contributed by Genmao Yu
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f86c81d9 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f86c81d9 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f86c81d9 Branch: refs/heads/trunk Commit: f86c81d923ecce9d1c9fb691bbc78e93b4a65ae7 Parents: 28792b6 Author: Kai Zheng <[email protected]> Authored: Thu Dec 14 11:14:52 2017 +0800 Committer: Kai Zheng <[email protected]> Committed: Thu Dec 14 11:14:52 2017 +0800 ---------------------------------------------------------------------- .../util/BlockingThreadPoolExecutorService.java | 169 ++++++++++++++ .../util/SemaphoredDelegatingExecutor.java | 227 ++++++++++++++++++ .../s3a/BlockingThreadPoolExecutorService.java | 170 -------------- .../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 2 + .../fs/s3a/SemaphoredDelegatingExecutor.java | 230 ------------------- .../ITestBlockingThreadPoolExecutorService.java | 2 + 6 files changed, 400 insertions(+), 400 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/f86c81d9/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/BlockingThreadPoolExecutorService.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/BlockingThreadPoolExecutorService.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/BlockingThreadPoolExecutorService.java new file mode 100644 index 0000000..404eea9 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/BlockingThreadPoolExecutorService.java @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.util; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.util.concurrent.MoreExecutors; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * This ExecutorService blocks the submission of new tasks when its queue is + * already full by using a semaphore. Task submissions require permits, task + * completions release permits. + * <p> + * This is inspired by <a href="https://github.com/apache/incubator-s4/blob/master/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/BlockingThreadPoolExecutorService.java"> + * this s4 threadpool</a> + */ [email protected] +public final class BlockingThreadPoolExecutorService + extends SemaphoredDelegatingExecutor { + + private static final Logger LOG = LoggerFactory + .getLogger(BlockingThreadPoolExecutorService.class); + + private static final AtomicInteger POOLNUMBER = new AtomicInteger(1); + + private final ThreadPoolExecutor eventProcessingExecutor; + + /** + * Returns a {@link java.util.concurrent.ThreadFactory} that names each + * created thread uniquely, + * with a common prefix. + * + * @param prefix The prefix of every created Thread's name + * @return a {@link java.util.concurrent.ThreadFactory} that names threads + */ + static ThreadFactory getNamedThreadFactory(final String prefix) { + SecurityManager s = System.getSecurityManager(); + final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() : + Thread.currentThread().getThreadGroup(); + + return new ThreadFactory() { + private final AtomicInteger threadNumber = new AtomicInteger(1); + private final int poolNum = POOLNUMBER.getAndIncrement(); + private final ThreadGroup group = threadGroup; + + @Override + public Thread newThread(Runnable r) { + final String name = + prefix + "-pool" + poolNum + "-t" + threadNumber.getAndIncrement(); + return new Thread(group, r, name); + } + }; + } + + /** + * Get a named {@link ThreadFactory} that just builds daemon threads. + * + * @param prefix name prefix for all threads created from the factory + * @return a thread factory that creates named, daemon threads with + * the supplied exception handler and normal priority + */ + public static ThreadFactory newDaemonThreadFactory(final String prefix) { + final ThreadFactory namedFactory = getNamedThreadFactory(prefix); + return new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread t = namedFactory.newThread(r); + if (!t.isDaemon()) { + t.setDaemon(true); + } + if (t.getPriority() != Thread.NORM_PRIORITY) { + t.setPriority(Thread.NORM_PRIORITY); + } + return t; + } + + }; + } + + private BlockingThreadPoolExecutorService(int permitCount, + ThreadPoolExecutor eventProcessingExecutor) { + super(MoreExecutors.listeningDecorator(eventProcessingExecutor), + permitCount, false); + this.eventProcessingExecutor = eventProcessingExecutor; + } + + /** + * A thread pool that that blocks clients submitting additional tasks if + * there are already {@code activeTasks} running threads and {@code + * waitingTasks} tasks waiting in its queue. + * + * @param activeTasks maximum number of active tasks + * @param waitingTasks maximum number of waiting tasks + * @param keepAliveTime time until threads are cleaned up in {@code unit} + * @param unit time unit + * @param prefixName prefix of name for threads + */ + public static BlockingThreadPoolExecutorService newInstance( + int activeTasks, + int waitingTasks, + long keepAliveTime, TimeUnit unit, + String prefixName) { + + /* Although we generally only expect up to waitingTasks tasks in the + queue, we need to be able to buffer all tasks in case dequeueing is + slower than enqueueing. */ + final BlockingQueue<Runnable> workQueue = + new LinkedBlockingQueue<>(waitingTasks + activeTasks); + ThreadPoolExecutor eventProcessingExecutor = + new ThreadPoolExecutor(activeTasks, activeTasks, keepAliveTime, unit, + workQueue, newDaemonThreadFactory(prefixName), + new RejectedExecutionHandler() { + @Override + public void rejectedExecution(Runnable r, + ThreadPoolExecutor executor) { + // This is not expected to happen. + LOG.error("Could not submit task to executor {}", + executor.toString()); + } + }); + eventProcessingExecutor.allowCoreThreadTimeOut(true); + return new BlockingThreadPoolExecutorService(waitingTasks + activeTasks, + eventProcessingExecutor); + } + + /** + * Get the actual number of active threads. + * @return the active thread count + */ + int getActiveCount() { + return eventProcessingExecutor.getActiveCount(); + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder( + "BlockingThreadPoolExecutorService{"); + sb.append(super.toString()); + sb.append(", activeCount=").append(getActiveCount()); + sb.append('}'); + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f86c81d9/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SemaphoredDelegatingExecutor.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SemaphoredDelegatingExecutor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SemaphoredDelegatingExecutor.java new file mode 100644 index 0000000..bcc19e3 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SemaphoredDelegatingExecutor.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util; + +import com.google.common.util.concurrent.ForwardingListeningExecutorService; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; + +import org.apache.hadoop.classification.InterfaceAudience; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * This ExecutorService blocks the submission of new tasks when its queue is + * already full by using a semaphore. Task submissions require permits, task + * completions release permits. + * <p> + * This is a refactoring of {@link BlockingThreadPoolExecutorService}; that code + * contains the thread pool logic, whereas this isolates the semaphore + * and submit logic for use with other thread pools and delegation models. + * <p> + * This is inspired by <a href="https://github.com/apache/incubator-s4/blob/master/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/BlockingThreadPoolExecutorService.java"> + * this s4 threadpool</a> + */ +@SuppressWarnings("NullableProblems") [email protected] +public class SemaphoredDelegatingExecutor extends + ForwardingListeningExecutorService { + + private final Semaphore queueingPermits; + private final ListeningExecutorService executorDelegatee; + private final int permitCount; + + /** + * Instantiate. + * @param executorDelegatee Executor to delegate to + * @param permitCount number of permits into the queue permitted + * @param fair should the semaphore be "fair" + */ + public SemaphoredDelegatingExecutor( + ListeningExecutorService executorDelegatee, + int permitCount, + boolean fair) { + this.permitCount = permitCount; + queueingPermits = new Semaphore(permitCount, fair); + this.executorDelegatee = executorDelegatee; + } + + @Override + protected ListeningExecutorService delegate() { + return executorDelegatee; + } + + + @Override + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) + throws InterruptedException { + throw new RuntimeException("Not implemented"); + } + + @Override + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, + long timeout, TimeUnit unit) throws InterruptedException { + throw new RuntimeException("Not implemented"); + } + + @Override + public <T> T invokeAny(Collection<? extends Callable<T>> tasks) + throws InterruptedException, ExecutionException { + throw new RuntimeException("Not implemented"); + } + + @Override + public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, + TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + throw new RuntimeException("Not implemented"); + } + + @Override + public <T> ListenableFuture<T> submit(Callable<T> task) { + try { + queueingPermits.acquire(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return Futures.immediateFailedCheckedFuture(e); + } + return super.submit(new CallableWithPermitRelease<>(task)); + } + + @Override + public <T> ListenableFuture<T> submit(Runnable task, T result) { + try { + queueingPermits.acquire(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return Futures.immediateFailedCheckedFuture(e); + } + return super.submit(new RunnableWithPermitRelease(task), result); + } + + @Override + public ListenableFuture<?> submit(Runnable task) { + try { + queueingPermits.acquire(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return Futures.immediateFailedCheckedFuture(e); + } + return super.submit(new RunnableWithPermitRelease(task)); + } + + @Override + public void execute(Runnable command) { + try { + queueingPermits.acquire(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + super.execute(new RunnableWithPermitRelease(command)); + } + + /** + * Get the number of permits available; guaranteed to be + * {@code 0 <= availablePermits <= size}. + * @return the number of permits available at the time of invocation. + */ + public int getAvailablePermits() { + return queueingPermits.availablePermits(); + } + + /** + * Get the number of threads waiting to acquire a permit. + * @return snapshot of the length of the queue of blocked threads. + */ + public int getWaitingCount() { + return queueingPermits.getQueueLength(); + } + + /** + * Total number of permits. + * @return the number of permits as set in the constructor + */ + public int getPermitCount() { + return permitCount; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder( + "SemaphoredDelegatingExecutor{"); + sb.append("permitCount=").append(getPermitCount()); + sb.append(", available=").append(getAvailablePermits()); + sb.append(", waiting=").append(getWaitingCount()); + sb.append('}'); + return sb.toString(); + } + + /** + * Releases a permit after the task is executed. + */ + class RunnableWithPermitRelease implements Runnable { + + private Runnable delegatee; + + RunnableWithPermitRelease(Runnable delegatee) { + this.delegatee = delegatee; + } + + @Override + public void run() { + try { + delegatee.run(); + } finally { + queueingPermits.release(); + } + + } + } + + /** + * Releases a permit after the task is completed. + */ + class CallableWithPermitRelease<T> implements Callable<T> { + + private Callable<T> delegatee; + + CallableWithPermitRelease(Callable<T> delegatee) { + this.delegatee = delegatee; + } + + @Override + public T call() throws Exception { + try { + return delegatee.call(); + } finally { + queueingPermits.release(); + } + } + + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f86c81d9/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java deleted file mode 100644 index f13942d..0000000 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java +++ /dev/null @@ -1,170 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.s3a; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.util.concurrent.MoreExecutors; - -import org.apache.hadoop.classification.InterfaceAudience; - -/** - * This ExecutorService blocks the submission of new tasks when its queue is - * already full by using a semaphore. Task submissions require permits, task - * completions release permits. - * <p> - * This is inspired by <a href="https://github.com/apache/incubator-s4/blob/master/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/BlockingThreadPoolExecutorService.java"> - * this s4 threadpool</a> - */ [email protected] -final class BlockingThreadPoolExecutorService - extends SemaphoredDelegatingExecutor { - - private static final Logger LOG = LoggerFactory - .getLogger(BlockingThreadPoolExecutorService.class); - - private static final AtomicInteger POOLNUMBER = new AtomicInteger(1); - - private final ThreadPoolExecutor eventProcessingExecutor; - - /** - * Returns a {@link java.util.concurrent.ThreadFactory} that names each - * created thread uniquely, - * with a common prefix. - * - * @param prefix The prefix of every created Thread's name - * @return a {@link java.util.concurrent.ThreadFactory} that names threads - */ - static ThreadFactory getNamedThreadFactory(final String prefix) { - SecurityManager s = System.getSecurityManager(); - final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() : - Thread.currentThread().getThreadGroup(); - - return new ThreadFactory() { - private final AtomicInteger threadNumber = new AtomicInteger(1); - private final int poolNum = POOLNUMBER.getAndIncrement(); - private final ThreadGroup group = threadGroup; - - @Override - public Thread newThread(Runnable r) { - final String name = - prefix + "-pool" + poolNum + "-t" + threadNumber.getAndIncrement(); - return new Thread(group, r, name); - } - }; - } - - /** - * Get a named {@link ThreadFactory} that just builds daemon threads. - * - * @param prefix name prefix for all threads created from the factory - * @return a thread factory that creates named, daemon threads with - * the supplied exception handler and normal priority - */ - static ThreadFactory newDaemonThreadFactory(final String prefix) { - final ThreadFactory namedFactory = getNamedThreadFactory(prefix); - return new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - Thread t = namedFactory.newThread(r); - if (!t.isDaemon()) { - t.setDaemon(true); - } - if (t.getPriority() != Thread.NORM_PRIORITY) { - t.setPriority(Thread.NORM_PRIORITY); - } - return t; - } - - }; - } - - private BlockingThreadPoolExecutorService(int permitCount, - ThreadPoolExecutor eventProcessingExecutor) { - super(MoreExecutors.listeningDecorator(eventProcessingExecutor), - permitCount, false); - this.eventProcessingExecutor = eventProcessingExecutor; - } - - /** - * A thread pool that that blocks clients submitting additional tasks if - * there are already {@code activeTasks} running threads and {@code - * waitingTasks} tasks waiting in its queue. - * - * @param activeTasks maximum number of active tasks - * @param waitingTasks maximum number of waiting tasks - * @param keepAliveTime time until threads are cleaned up in {@code unit} - * @param unit time unit - * @param prefixName prefix of name for threads - */ - public static BlockingThreadPoolExecutorService newInstance( - int activeTasks, - int waitingTasks, - long keepAliveTime, TimeUnit unit, - String prefixName) { - - /* Although we generally only expect up to waitingTasks tasks in the - queue, we need to be able to buffer all tasks in case dequeueing is - slower than enqueueing. */ - final BlockingQueue<Runnable> workQueue = - new LinkedBlockingQueue<>(waitingTasks + activeTasks); - ThreadPoolExecutor eventProcessingExecutor = - new ThreadPoolExecutor(activeTasks, activeTasks, keepAliveTime, unit, - workQueue, newDaemonThreadFactory(prefixName), - new RejectedExecutionHandler() { - @Override - public void rejectedExecution(Runnable r, - ThreadPoolExecutor executor) { - // This is not expected to happen. - LOG.error("Could not submit task to executor {}", - executor.toString()); - } - }); - eventProcessingExecutor.allowCoreThreadTimeOut(true); - return new BlockingThreadPoolExecutorService(waitingTasks + activeTasks, - eventProcessingExecutor); - } - - /** - * Get the actual number of active threads. - * @return the active thread count - */ - int getActiveCount() { - return eventProcessingExecutor.getActiveCount(); - } - - @Override - public String toString() { - final StringBuilder sb = new StringBuilder( - "BlockingThreadPoolExecutorService{"); - sb.append(super.toString()); - sb.append(", activeCount=").append(getActiveCount()); - sb.append('}'); - return sb.toString(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f86c81d9/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 63a4349..e927758 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -112,8 +112,10 @@ import org.apache.hadoop.fs.s3a.s3guard.S3Guard; import org.apache.hadoop.fs.s3native.S3xLoginHelper; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.BlockingThreadPoolExecutorService; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.SemaphoredDelegatingExecutor; import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.Invoker.*; http://git-wip-us.apache.org/repos/asf/hadoop/blob/f86c81d9/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/SemaphoredDelegatingExecutor.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/SemaphoredDelegatingExecutor.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/SemaphoredDelegatingExecutor.java deleted file mode 100644 index 6b21912..0000000 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/SemaphoredDelegatingExecutor.java +++ /dev/null @@ -1,230 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.s3a; - -import com.google.common.util.concurrent.ForwardingListeningExecutorService; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; - -import org.apache.hadoop.classification.InterfaceAudience; - -import java.util.Collection; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -/** - * This ExecutorService blocks the submission of new tasks when its queue is - * already full by using a semaphore. Task submissions require permits, task - * completions release permits. - * <p> - * This is a refactoring of {@link BlockingThreadPoolExecutorService}; that code - * contains the thread pool logic, whereas this isolates the semaphore - * and submit logic for use with other thread pools and delegation models. - * In particular, it <i>permits multiple per stream executors to share a - * single per-FS-instance executor; the latter to throttle overall - * load from the the FS, the others to limit the amount of load which - * a single output stream can generate.</i> - * <p> - * This is inspired by <a href="https://github.com/apache/incubator-s4/blob/master/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/BlockingThreadPoolExecutorService.java"> - * this s4 threadpool</a> - */ -@SuppressWarnings("NullableProblems") [email protected] -class SemaphoredDelegatingExecutor extends - ForwardingListeningExecutorService { - - private final Semaphore queueingPermits; - private final ListeningExecutorService executorDelegatee; - private final int permitCount; - - /** - * Instantiate. - * @param executorDelegatee Executor to delegate to - * @param permitCount number of permits into the queue permitted - * @param fair should the semaphore be "fair" - */ - SemaphoredDelegatingExecutor(ListeningExecutorService executorDelegatee, - int permitCount, - boolean fair) { - this.permitCount = permitCount; - queueingPermits = new Semaphore(permitCount, fair); - this.executorDelegatee = executorDelegatee; - } - - @Override - protected ListeningExecutorService delegate() { - return executorDelegatee; - } - - - @Override - public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) - throws InterruptedException { - throw new RuntimeException("Not implemented"); - } - - @Override - public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, - long timeout, TimeUnit unit) throws InterruptedException { - throw new RuntimeException("Not implemented"); - } - - @Override - public <T> T invokeAny(Collection<? extends Callable<T>> tasks) - throws InterruptedException, ExecutionException { - throw new RuntimeException("Not implemented"); - } - - @Override - public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, - TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - throw new RuntimeException("Not implemented"); - } - - @Override - public <T> ListenableFuture<T> submit(Callable<T> task) { - try { - queueingPermits.acquire(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return Futures.immediateFailedCheckedFuture(e); - } - return super.submit(new CallableWithPermitRelease<>(task)); - } - - @Override - public <T> ListenableFuture<T> submit(Runnable task, T result) { - try { - queueingPermits.acquire(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return Futures.immediateFailedCheckedFuture(e); - } - return super.submit(new RunnableWithPermitRelease(task), result); - } - - @Override - public ListenableFuture<?> submit(Runnable task) { - try { - queueingPermits.acquire(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return Futures.immediateFailedCheckedFuture(e); - } - return super.submit(new RunnableWithPermitRelease(task)); - } - - @Override - public void execute(Runnable command) { - try { - queueingPermits.acquire(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - super.execute(new RunnableWithPermitRelease(command)); - } - - /** - * Get the number of permits available; guaranteed to be - * {@code 0 <= availablePermits <= size}. - * @return the number of permits available at the time of invocation. - */ - public int getAvailablePermits() { - return queueingPermits.availablePermits(); - } - - /** - * Get the number of threads waiting to acquire a permit. - * @return snapshot of the length of the queue of blocked threads. - */ - public int getWaitingCount() { - return queueingPermits.getQueueLength(); - } - - /** - * Total number of permits. - * @return the number of permits as set in the constructor - */ - public int getPermitCount() { - return permitCount; - } - - @Override - public String toString() { - final StringBuilder sb = new StringBuilder( - "SemaphoredDelegatingExecutor{"); - sb.append("permitCount=").append(getPermitCount()); - sb.append(", available=").append(getAvailablePermits()); - sb.append(", waiting=").append(getWaitingCount()); - sb.append('}'); - return sb.toString(); - } - - /** - * Releases a permit after the task is executed. - */ - class RunnableWithPermitRelease implements Runnable { - - private Runnable delegatee; - - public RunnableWithPermitRelease(Runnable delegatee) { - this.delegatee = delegatee; - } - - @Override - public void run() { - try { - delegatee.run(); - } finally { - queueingPermits.release(); - } - - } - } - - /** - * Releases a permit after the task is completed. - */ - class CallableWithPermitRelease<T> implements Callable<T> { - - private Callable<T> delegatee; - - public CallableWithPermitRelease(Callable<T> delegatee) { - this.delegatee = delegatee; - } - - @Override - public T call() throws Exception { - try { - return delegatee.call(); - } finally { - queueingPermits.release(); - } - } - - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f86c81d9/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestBlockingThreadPoolExecutorService.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestBlockingThreadPoolExecutorService.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestBlockingThreadPoolExecutorService.java index b1b8240..3dfe286 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestBlockingThreadPoolExecutorService.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestBlockingThreadPoolExecutorService.java @@ -19,6 +19,8 @@ package org.apache.hadoop.fs.s3a; import com.google.common.util.concurrent.ListenableFuture; +import org.apache.hadoop.util.BlockingThreadPoolExecutorService; +import org.apache.hadoop.util.SemaphoredDelegatingExecutor; import org.apache.hadoop.util.StopWatch; import org.junit.AfterClass; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
