HDFS-10909. De-duplicate code in 
ErasureCodingWorker#initializeStripedReadThreadPool and 
DFSClient#initThreadsNumForStripedReads. (Manoj Govindassamy via lei)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b5920619
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b5920619
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b5920619

Branch: refs/heads/YARN-5355
Commit: b59206190e6f773fc223bcb81774a09715551367
Parents: 9b0c17f
Author: Lei Xu <l...@apache.org>
Authored: Wed Nov 2 16:45:25 2016 -0700
Committer: Lei Xu <l...@apache.org>
Committed: Wed Nov 2 16:45:25 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/hadoop/hdfs/DFSClient.java  | 30 ++----------
 .../org/apache/hadoop/hdfs/DFSUtilClient.java   | 50 ++++++++++++++++++++
 .../erasurecode/ErasureCodingWorker.java        | 23 +++------
 3 files changed, 61 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5920619/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 93c0ff0..0128b07 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -2800,37 +2800,17 @@ public class DFSClient implements java.io.Closeable, 
RemotePeerFactory,
   /**
    * Create thread pool for parallel reading in striped layout,
    * STRIPED_READ_THREAD_POOL, if it does not already exist.
-   * @param num Number of threads for striped reads thread pool.
+   * @param numThreads Number of threads for striped reads thread pool.
    */
-  private void initThreadsNumForStripedReads(int num) {
-    assert num > 0;
+  private void initThreadsNumForStripedReads(int numThreads) {
+    assert numThreads > 0;
     if (STRIPED_READ_THREAD_POOL != null) {
       return;
     }
     synchronized (DFSClient.class) {
       if (STRIPED_READ_THREAD_POOL == null) {
-        STRIPED_READ_THREAD_POOL = new ThreadPoolExecutor(1, num, 60,
-            TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
-            new Daemon.DaemonFactory() {
-              private final AtomicInteger threadIndex = new AtomicInteger(0);
-
-              @Override
-              public Thread newThread(Runnable r) {
-                Thread t = super.newThread(r);
-                t.setName("stripedRead-" + threadIndex.getAndIncrement());
-                return t;
-              }
-            },
-            new ThreadPoolExecutor.CallerRunsPolicy() {
-              @Override
-              public void rejectedExecution(Runnable runnable,
-                  ThreadPoolExecutor e) {
-                LOG.info("Execution for striped reading rejected, "
-                    + "Executing in current thread");
-                // will run in the current thread
-                super.rejectedExecution(runnable, e);
-              }
-            });
+        STRIPED_READ_THREAD_POOL = DFSUtilClient.getThreadPoolExecutor(1,
+            numThreads, 60, "StripedRead-", true);
         STRIPED_READ_THREAD_POOL.allowCoreThreadTimeOut(true);
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5920619/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
index b93632a..d267530 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs;
 
 import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 import com.google.common.primitives.SignedBytes;
 import org.apache.hadoop.conf.Configuration;
@@ -51,6 +52,7 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.KMSUtil;
 import org.apache.hadoop.util.StringUtils;
 import org.slf4j.Logger;
@@ -81,6 +83,10 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_DEFAULT;
 import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_KEY;
@@ -776,4 +782,48 @@ public class DFSUtilClient {
         DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_KEY,
         DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_DEFAULT);
   }
+
+  /**
+   * Utility to create a {@link ThreadPoolExecutor}.
+   *
+   * @param corePoolSize - min threads in the pool, even if idle
+   * @param maxPoolSize - max threads in the pool
+   * @param keepAliveTimeSecs - max seconds beyond which excess idle threads
+   *        will be terminated
+   * @param threadNamePrefix - name prefix for the pool threads
+   * @param runRejectedExec - when true, rejected tasks from
+   *        ThreadPoolExecutor are run in the context of calling thread
+   * @return ThreadPoolExecutor
+   */
+  public static ThreadPoolExecutor getThreadPoolExecutor(int corePoolSize,
+      int maxPoolSize, long keepAliveTimeSecs, String threadNamePrefix,
+      boolean runRejectedExec) {
+    Preconditions.checkArgument(corePoolSize > 0);
+    ThreadPoolExecutor threadPoolExecutor = new 
ThreadPoolExecutor(corePoolSize,
+        maxPoolSize, keepAliveTimeSecs, TimeUnit.SECONDS,
+        new SynchronousQueue<Runnable>(), new Daemon.DaemonFactory() {
+          private final AtomicInteger threadIndex = new AtomicInteger(0);
+
+          @Override
+          public Thread newThread(Runnable r) {
+            Thread t = super.newThread(r);
+            t.setName(threadNamePrefix + threadIndex.getAndIncrement());
+            return t;
+          }
+        });
+    if (runRejectedExec) {
+      threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor
+          .CallerRunsPolicy() {
+        @Override
+        public void rejectedExecution(Runnable runnable,
+            ThreadPoolExecutor e) {
+          LOG.info(threadNamePrefix + " task is rejected by " +
+                  "ThreadPoolExecutor. Executing it in current thread.");
+          // will run in the current thread
+          super.rejectedExecution(runnable, e);
+        }
+      });
+    }
+    return threadPoolExecutor;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5920619/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
index aacbb2d..1492e5d 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
@@ -20,13 +20,13 @@ package org.apache.hadoop.hdfs.server.datanode.erasurecode;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import 
org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
 import org.apache.hadoop.util.Daemon;
 import org.slf4j.Logger;
 
 import java.util.Collection;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -89,22 +89,11 @@ public final class ErasureCodingWorker {
     stripedReadPool.allowCoreThreadTimeOut(true);
   }
 
-  private void initializeStripedBlkReconstructionThreadPool(int num) {
-    LOG.debug("Using striped block reconstruction; pool threads={}", num);
-    stripedReconstructionPool = new ThreadPoolExecutor(2, num, 60,
-        TimeUnit.SECONDS,
-        new LinkedBlockingQueue<Runnable>(),
-        new Daemon.DaemonFactory() {
-          private final AtomicInteger threadIdx = new AtomicInteger(0);
-
-          @Override
-          public Thread newThread(Runnable r) {
-            Thread t = super.newThread(r);
-            t.setName("stripedBlockReconstruction-"
-                + threadIdx.getAndIncrement());
-            return t;
-          }
-        });
+  private void initializeStripedBlkReconstructionThreadPool(int numThreads) {
+    LOG.debug("Using striped block reconstruction; pool threads={}",
+        numThreads);
+    stripedReconstructionPool = DFSUtilClient.getThreadPoolExecutor(2,
+        numThreads, 60, "StripedBlockReconstruction-", false);
     stripedReconstructionPool.allowCoreThreadTimeOut(true);
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to