HDFS-10909. De-duplicate code in ErasureCodingWorker#initializeStripedReadThreadPool and DFSClient#initThreadsNumForStripedReads. (Manoj Govindassamy via lei)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b5920619 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b5920619 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b5920619 Branch: refs/heads/YARN-5355 Commit: b59206190e6f773fc223bcb81774a09715551367 Parents: 9b0c17f Author: Lei Xu <l...@apache.org> Authored: Wed Nov 2 16:45:25 2016 -0700 Committer: Lei Xu <l...@apache.org> Committed: Wed Nov 2 16:45:25 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/hadoop/hdfs/DFSClient.java | 30 ++---------- .../org/apache/hadoop/hdfs/DFSUtilClient.java | 50 ++++++++++++++++++++ .../erasurecode/ErasureCodingWorker.java | 23 +++------ 3 files changed, 61 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5920619/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 93c0ff0..0128b07 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -2800,37 +2800,17 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, /** * Create thread pool for parallel reading in striped layout, * STRIPED_READ_THREAD_POOL, if it does not already exist. - * @param num Number of threads for striped reads thread pool. + * @param numThreads Number of threads for striped reads thread pool. */ - private void initThreadsNumForStripedReads(int num) { - assert num > 0; + private void initThreadsNumForStripedReads(int numThreads) { + assert numThreads > 0; if (STRIPED_READ_THREAD_POOL != null) { return; } synchronized (DFSClient.class) { if (STRIPED_READ_THREAD_POOL == null) { - STRIPED_READ_THREAD_POOL = new ThreadPoolExecutor(1, num, 60, - TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), - new Daemon.DaemonFactory() { - private final AtomicInteger threadIndex = new AtomicInteger(0); - - @Override - public Thread newThread(Runnable r) { - Thread t = super.newThread(r); - t.setName("stripedRead-" + threadIndex.getAndIncrement()); - return t; - } - }, - new ThreadPoolExecutor.CallerRunsPolicy() { - @Override - public void rejectedExecution(Runnable runnable, - ThreadPoolExecutor e) { - LOG.info("Execution for striped reading rejected, " - + "Executing in current thread"); - // will run in the current thread - super.rejectedExecution(runnable, e); - } - }); + STRIPED_READ_THREAD_POOL = DFSUtilClient.getThreadPoolExecutor(1, + numThreads, 60, "StripedRead-", true); STRIPED_READ_THREAD_POOL.allowCoreThreadTimeOut(true); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5920619/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java index b93632a..d267530 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs; import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; import com.google.common.collect.Maps; import com.google.common.primitives.SignedBytes; import org.apache.hadoop.conf.Configuration; @@ -51,6 +52,7 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.KMSUtil; import org.apache.hadoop.util.StringUtils; import org.slf4j.Logger; @@ -81,6 +83,10 @@ import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_KEY; @@ -776,4 +782,48 @@ public class DFSUtilClient { DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_KEY, DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_DEFAULT); } + + /** + * Utility to create a {@link ThreadPoolExecutor}. + * + * @param corePoolSize - min threads in the pool, even if idle + * @param maxPoolSize - max threads in the pool + * @param keepAliveTimeSecs - max seconds beyond which excess idle threads + * will be terminated + * @param threadNamePrefix - name prefix for the pool threads + * @param runRejectedExec - when true, rejected tasks from + * ThreadPoolExecutor are run in the context of calling thread + * @return ThreadPoolExecutor + */ + public static ThreadPoolExecutor getThreadPoolExecutor(int corePoolSize, + int maxPoolSize, long keepAliveTimeSecs, String threadNamePrefix, + boolean runRejectedExec) { + Preconditions.checkArgument(corePoolSize > 0); + ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, + maxPoolSize, keepAliveTimeSecs, TimeUnit.SECONDS, + new SynchronousQueue<Runnable>(), new Daemon.DaemonFactory() { + private final AtomicInteger threadIndex = new AtomicInteger(0); + + @Override + public Thread newThread(Runnable r) { + Thread t = super.newThread(r); + t.setName(threadNamePrefix + threadIndex.getAndIncrement()); + return t; + } + }); + if (runRejectedExec) { + threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor + .CallerRunsPolicy() { + @Override + public void rejectedExecution(Runnable runnable, + ThreadPoolExecutor e) { + LOG.info(threadNamePrefix + " task is rejected by " + + "ThreadPoolExecutor. Executing it in current thread."); + // will run in the current thread + super.rejectedExecution(runnable, e); + } + }); + } + return threadPoolExecutor; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5920619/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java index aacbb2d..1492e5d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java @@ -20,13 +20,13 @@ package org.apache.hadoop.hdfs.server.datanode.erasurecode; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo; import org.apache.hadoop.util.Daemon; import org.slf4j.Logger; import java.util.Collection; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -89,22 +89,11 @@ public final class ErasureCodingWorker { stripedReadPool.allowCoreThreadTimeOut(true); } - private void initializeStripedBlkReconstructionThreadPool(int num) { - LOG.debug("Using striped block reconstruction; pool threads={}", num); - stripedReconstructionPool = new ThreadPoolExecutor(2, num, 60, - TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>(), - new Daemon.DaemonFactory() { - private final AtomicInteger threadIdx = new AtomicInteger(0); - - @Override - public Thread newThread(Runnable r) { - Thread t = super.newThread(r); - t.setName("stripedBlockReconstruction-" - + threadIdx.getAndIncrement()); - return t; - } - }); + private void initializeStripedBlkReconstructionThreadPool(int numThreads) { + LOG.debug("Using striped block reconstruction; pool threads={}", + numThreads); + stripedReconstructionPool = DFSUtilClient.getThreadPoolExecutor(2, + numThreads, 60, "StripedBlockReconstruction-", false); stripedReconstructionPool.allowCoreThreadTimeOut(true); } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org