HADOOP-11446 S3AOutputStream should use shared thread pool to avoid 
OutOfMemoryError


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/27d83958
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/27d83958
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/27d83958

Branch: refs/heads/trunk
Commit: 27d8395867f665fea1360087325cda5ed70efd0c
Parents: 21c6f01
Author: Steve Loughran <ste...@apache.org>
Authored: Mon Jan 5 12:59:48 2015 +0000
Committer: Steve Loughran <ste...@apache.org>
Committed: Mon Jan 5 13:00:01 2015 +0000

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt |  3 +
 .../org/apache/hadoop/fs/s3a/Constants.java     | 17 ++++
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 91 ++++++++++++++++++--
 .../apache/hadoop/fs/s3a/S3AOutputStream.java   | 13 +--
 4 files changed, 109 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/27d83958/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt 
b/hadoop-common-project/hadoop-common/CHANGES.txt
index ec75e8d..baf68d6 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -663,6 +663,9 @@ Release 2.7.0 - UNRELEASED
     HADOOP-11039. ByteBufferReadable API doc is inconsistent with the
     implementations. (Yi Liu via Colin P. McCabe)
 
+    HADOOP-11446. S3AOutputStream should use shared thread pool to
+    avoid OutOfMemoryError. (Ted Yu via stevel)        
+    
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/27d83958/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index ee4bf68..f1b5d3d 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -41,6 +41,23 @@ public class Constants {
   public static final String MAX_PAGING_KEYS = "fs.s3a.paging.maximum";
   public static final int DEFAULT_MAX_PAGING_KEYS = 5000;
 
+  // the maximum number of threads to allow in the pool used by TransferManager
+  public static final String MAX_THREADS = "fs.s3a.threads.max";
+  public static final int DEFAULT_MAX_THREADS = 256;
+
+  // the number of threads to keep in the pool used by TransferManager
+  public static final String CORE_THREADS = "fs.s3a.threads.core";
+  public static final int DEFAULT_CORE_THREADS = DEFAULT_MAXIMUM_CONNECTIONS;
+
+  // when the number of threads is greater than the core, this is the maximum 
time
+  // that excess idle threads will wait for new tasks before terminating.
+  public static final String KEEPALIVE_TIME = "fs.s3a.threads.keepalivetime";
+  public static final int DEFAULT_KEEPALIVE_TIME = 60;
+
+  // the maximum number of tasks that the LinkedBlockingQueue can hold
+  public static final String MAX_TOTAL_TASKS = "fs.s3a.max.total.tasks";
+  public static final int DEFAULT_MAX_TOTAL_TASKS = 1000;
+
   // size of each of or multipart pieces in bytes
   public static final String MULTIPART_SIZE = "fs.s3a.multipart.size";
   public static final long DEFAULT_MULTIPART_SIZE = 104857600; // 100 MB

http://git-wip-us.apache.org/repos/asf/hadoop/blob/27d83958/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 457351d..e6b1557 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -26,6 +26,11 @@ import java.net.URI;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.fs.s3.S3Credentials;
 
@@ -77,6 +82,7 @@ public class S3AFileSystem extends FileSystem {
   private String bucket;
   private int maxKeys;
   private long partSize;
+  private TransferManager transfers;
   private int partSizeThreshold;
   public static final Logger LOG = 
LoggerFactory.getLogger(S3AFileSystem.class);
   private CannedAccessControlList cannedACL;
@@ -85,6 +91,55 @@ public class S3AFileSystem extends FileSystem {
   // The maximum number of entries that can be deleted in any call to s3
   private static final int MAX_ENTRIES_TO_DELETE = 1000;
 
+  private static final AtomicInteger poolNumber = new AtomicInteger(1);
+  /**
+   * Returns a {@link java.util.concurrent.ThreadFactory} that names each 
created thread uniquely,
+   * with a common prefix.
+   * @param prefix The prefix of every created Thread's name
+   * @return a {@link java.util.concurrent.ThreadFactory} that names threads
+   */
+  public static ThreadFactory getNamedThreadFactory(final String prefix) {
+    SecurityManager s = System.getSecurityManager();
+    final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() : 
Thread.currentThread()
+        .getThreadGroup();
+
+    return new ThreadFactory() {
+      final AtomicInteger threadNumber = new AtomicInteger(1);
+      private final int poolNum = poolNumber.getAndIncrement();
+      final ThreadGroup group = threadGroup;
+
+      @Override
+      public Thread newThread(Runnable r) {
+        final String name = prefix + "-pool" + poolNum + "-t" + 
threadNumber.getAndIncrement();
+        return new Thread(group, r, name);
+      }
+    };
+  }
+
+  /**
+   * Get a named {@link ThreadFactory} that just builds daemon threads.
+   * @param prefix name prefix for all threads created from the factory
+   * @return a thread factory that creates named, daemon threads with
+   *         the supplied exception handler and normal priority
+   */
+  private static ThreadFactory newDaemonThreadFactory(final String prefix) {
+    final ThreadFactory namedFactory = getNamedThreadFactory(prefix);
+    return new ThreadFactory() {
+      @Override
+      public Thread newThread(Runnable r) {
+        Thread t = namedFactory.newThread(r);
+        if (!t.isDaemon()) {
+          t.setDaemon(true);
+        }
+        if (t.getPriority() != Thread.NORM_PRIORITY) {
+          t.setPriority(Thread.NORM_PRIORITY);
+        }
+        return t;
+      }
+
+    };
+  }
+
   /** Called after a new FileSystem instance is constructed.
    * @param name a uri whose authority section names the host, port, etc.
    *   for this FileSystem
@@ -93,7 +148,6 @@ public class S3AFileSystem extends FileSystem {
   public void initialize(URI name, Configuration conf) throws IOException {
     super.initialize(name, conf);
 
-
     uri = URI.create(name.getScheme() + "://" + name.getAuthority());
     workingDir = new Path("/user", 
System.getProperty("user.name")).makeQualified(this.uri,
         this.getWorkingDirectory());
@@ -138,6 +192,34 @@ public class S3AFileSystem extends FileSystem {
       partSizeThreshold = 5 * 1024 * 1024;
     }
 
+    int maxThreads = conf.getInt(MAX_THREADS, DEFAULT_MAX_THREADS);
+    int coreThreads = conf.getInt(CORE_THREADS, DEFAULT_CORE_THREADS);
+    if (maxThreads == 0) {
+      maxThreads = Runtime.getRuntime().availableProcessors() * 8;
+    }
+    if (coreThreads == 0) {
+      coreThreads = Runtime.getRuntime().availableProcessors() * 8;
+    }
+    long keepAliveTime = conf.getLong(KEEPALIVE_TIME, DEFAULT_KEEPALIVE_TIME);
+    LinkedBlockingQueue<Runnable> workQueue =
+      new LinkedBlockingQueue<Runnable>(maxThreads *
+        conf.getInt(MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS));
+    ThreadPoolExecutor tpe = new ThreadPoolExecutor(
+        coreThreads,
+        maxThreads,
+        keepAliveTime,
+        TimeUnit.SECONDS,
+        workQueue,
+        newDaemonThreadFactory("s3a-transfer-shared-"));
+    tpe.allowCoreThreadTimeOut(true);
+
+    TransferManagerConfiguration transferConfiguration = new 
TransferManagerConfiguration();
+    transferConfiguration.setMinimumUploadPartSize(partSize);
+    transferConfiguration.setMultipartUploadThreshold(partSizeThreshold);
+
+    transfers = new TransferManager(s3, tpe);
+    transfers.setConfiguration(transferConfiguration);
+
     String cannedACLName = conf.get(CANNED_ACL, DEFAULT_CANNED_ACL);
     if (!cannedACLName.isEmpty()) {
       cannedACL = CannedAccessControlList.valueOf(cannedACLName);
@@ -155,11 +237,10 @@ public class S3AFileSystem extends FileSystem {
       DEFAULT_PURGE_EXISTING_MULTIPART_AGE);
 
     if (purgeExistingMultipart) {
-      TransferManager transferManager = new TransferManager(s3);
       Date purgeBefore = new Date(new Date().getTime() - 
purgeExistingMultipartAge*1000);
 
-      transferManager.abortMultipartUploads(bucket, purgeBefore);
-      transferManager.shutdownNow(false);
+      transfers.abortMultipartUploads(bucket, purgeBefore);
+      transfers.shutdownNow(false);
     }
 
     serverSideEncryptionAlgorithm = conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM);
@@ -245,7 +326,7 @@ public class S3AFileSystem extends FileSystem {
     }
 
     // We pass null to FSDataOutputStream so it won't count writes that are 
being buffered to a file
-    return new FSDataOutputStream(new S3AOutputStream(getConf(), s3, this, 
+    return new FSDataOutputStream(new S3AOutputStream(getConf(), transfers, 
this,
       bucket, key, progress, cannedACL, statistics, 
       serverSideEncryptionAlgorithm), null);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/27d83958/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java
index 7783b99..2b611b6 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java
@@ -49,7 +49,7 @@ public class S3AOutputStream extends OutputStream {
   private boolean closed;
   private String key;
   private String bucket;
-  private AmazonS3Client client;
+  private TransferManager transfers;
   private Progressable progress;
   private long partSize;
   private int partSizeThreshold;
@@ -61,14 +61,14 @@ public class S3AOutputStream extends OutputStream {
 
   public static final Logger LOG = S3AFileSystem.LOG;
 
-  public S3AOutputStream(Configuration conf, AmazonS3Client client, 
+  public S3AOutputStream(Configuration conf, TransferManager transfers,
     S3AFileSystem fs, String bucket, String key, Progressable progress, 
     CannedAccessControlList cannedACL, FileSystem.Statistics statistics, 
     String serverSideEncryptionAlgorithm)
       throws IOException {
     this.bucket = bucket;
     this.key = key;
-    this.client = client;
+    this.transfers = transfers;
     this.progress = progress;
     this.fs = fs;
     this.cannedACL = cannedACL;
@@ -114,13 +114,6 @@ public class S3AOutputStream extends OutputStream {
 
 
     try {
-      TransferManagerConfiguration transferConfiguration = new 
TransferManagerConfiguration();
-      transferConfiguration.setMinimumUploadPartSize(partSize);
-      transferConfiguration.setMultipartUploadThreshold(partSizeThreshold);
-
-      TransferManager transfers = new TransferManager(client);
-      transfers.setConfiguration(transferConfiguration);
-
       final ObjectMetadata om = new ObjectMetadata();
       if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
         om.setServerSideEncryption(serverSideEncryptionAlgorithm);

Reply via email to