HADOOP-13560. S3ABlockOutputStream to support huge (many GB) file writes. 
Contributed by Steve Loughran


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6c348c56
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6c348c56
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6c348c56

Branch: refs/heads/trunk
Commit: 6c348c56918973fd988b110e79231324a8befe12
Parents: b733a6f
Author: Steve Loughran <ste...@apache.org>
Authored: Tue Oct 18 19:33:38 2016 +0100
Committer: Steve Loughran <ste...@apache.org>
Committed: Tue Oct 18 21:16:02 2016 +0100

----------------------------------------------------------------------
 .../src/main/resources/core-default.xml         |  74 +-
 .../hadoop/fs/contract/ContractTestUtils.java   |  16 +-
 hadoop-tools/hadoop-aws/pom.xml                 |  58 +-
 .../s3a/BlockingThreadPoolExecutorService.java  | 168 +---
 .../org/apache/hadoop/fs/s3a/Constants.java     |  71 +-
 .../hadoop/fs/s3a/S3ABlockOutputStream.java     | 703 ++++++++++++++++
 .../org/apache/hadoop/fs/s3a/S3ADataBlocks.java | 821 +++++++++++++++++++
 .../hadoop/fs/s3a/S3AFastOutputStream.java      | 410 ---------
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 408 +++++++--
 .../hadoop/fs/s3a/S3AInstrumentation.java       | 248 +++++-
 .../apache/hadoop/fs/s3a/S3AOutputStream.java   |  57 +-
 .../java/org/apache/hadoop/fs/s3a/S3AUtils.java |  39 +
 .../fs/s3a/SemaphoredDelegatingExecutor.java    | 230 ++++++
 .../org/apache/hadoop/fs/s3a/Statistic.java     |  32 +-
 .../src/site/markdown/tools/hadoop-aws/index.md | 668 +++++++++++++--
 .../fs/contract/s3a/ITestS3AContractDistCp.java |  10 +-
 .../hadoop/fs/s3a/AbstractS3ATestBase.java      |   1 +
 .../ITestBlockingThreadPoolExecutorService.java |  48 +-
 .../hadoop/fs/s3a/ITestS3ABlockOutputArray.java |  90 ++
 .../fs/s3a/ITestS3ABlockOutputByteBuffer.java   |  30 +
 .../hadoop/fs/s3a/ITestS3ABlockOutputDisk.java  |  30 +
 .../fs/s3a/ITestS3ABlockingThreadPool.java      |   2 +
 .../hadoop/fs/s3a/ITestS3AConfiguration.java    |  29 +
 .../ITestS3AEncryptionBlockOutputStream.java    |  36 +
 .../s3a/ITestS3AEncryptionFastOutputStream.java |  35 -
 .../hadoop/fs/s3a/ITestS3AFastOutputStream.java |  74 --
 .../apache/hadoop/fs/s3a/ITestS3ATestUtils.java |  98 +++
 .../apache/hadoop/fs/s3a/S3ATestConstants.java  |  75 +-
 .../org/apache/hadoop/fs/s3a/S3ATestUtils.java  | 148 +++-
 .../apache/hadoop/fs/s3a/TestDataBlocks.java    | 124 +++
 .../ITestS3AFileContextStatistics.java          |   1 +
 .../fs/s3a/scale/AbstractSTestS3AHugeFiles.java | 412 ++++++++++
 .../fs/s3a/scale/ITestS3ADeleteManyFiles.java   |  19 +-
 .../s3a/scale/ITestS3AHugeFilesArrayBlocks.java |  31 +
 .../ITestS3AHugeFilesByteBufferBlocks.java      |  34 +
 .../scale/ITestS3AHugeFilesClassicOutput.java   |  41 +
 .../s3a/scale/ITestS3AHugeFilesDiskBlocks.java  |  31 +
 .../hadoop/fs/s3a/scale/S3AScaleTestBase.java   | 151 ++--
 38 files changed, 4647 insertions(+), 906 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c348c56/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml 
b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index 4882728..daa421c 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -994,8 +994,8 @@
 <property>
   <name>fs.s3a.threads.max</name>
   <value>10</value>
-  <description> Maximum number of concurrent active (part)uploads,
-    which each use a thread from the threadpool.</description>
+  <description>The total number of threads available in the filesystem for data
+    uploads *or any other queued filesystem operation*.</description>
 </property>
 
 <property>
@@ -1008,8 +1008,7 @@
 <property>
   <name>fs.s3a.max.total.tasks</name>
   <value>5</value>
-  <description>Number of (part)uploads allowed to the queue before
-    blocking additional uploads.</description>
+  <description>The number of operations which can be queued for 
execution</description>
 </property>
 
 <property>
@@ -1047,13 +1046,21 @@
   <name>fs.s3a.multipart.purge</name>
   <value>false</value>
   <description>True if you want to purge existing multipart uploads that may 
not have been
-     completed/aborted correctly</description>
+    completed/aborted correctly. The corresponding purge age is defined in
+    fs.s3a.multipart.purge.age.
+    If set, when the filesystem is instantiated then all outstanding uploads
+    older than the purge age will be terminated -across the entire bucket.
+    This will impact multipart uploads by other applications and users. so 
should
+    be used sparingly, with an age value chosen to stop failed uploads, without
+    breaking ongoing operations.
+  </description>
 </property>
 
 <property>
   <name>fs.s3a.multipart.purge.age</name>
   <value>86400</value>
-  <description>Minimum age in seconds of multipart uploads to 
purge</description>
+  <description>Minimum age in seconds of multipart uploads to purge.
+  </description>
 </property>
 
 <property>
@@ -1086,10 +1093,50 @@
 <property>
   <name>fs.s3a.fast.upload</name>
   <value>false</value>
-  <description>Upload directly from memory instead of buffering to
-    disk first. Memory usage and parallelism can be controlled as up to
-    fs.s3a.multipart.size memory is consumed for each (part)upload actively
-    uploading (fs.s3a.threads.max) or queueing 
(fs.s3a.max.total.tasks)</description>
+  <description>
+    Use the incremental block-based fast upload mechanism with
+    the buffering mechanism set in fs.s3a.fast.upload.buffer.
+  </description>
+</property>
+
+<property>
+  <name>fs.s3a.fast.upload.buffer</name>
+  <value>disk</value>
+  <description>
+    The buffering mechanism to use when using S3A fast upload
+    (fs.s3a.fast.upload=true). Values: disk, array, bytebuffer.
+    This configuration option has no effect if fs.s3a.fast.upload is false.
+
+    "disk" will use the directories listed in fs.s3a.buffer.dir as
+    the location(s) to save data prior to being uploaded.
+
+    "array" uses arrays in the JVM heap
+
+    "bytebuffer" uses off-heap memory within the JVM.
+
+    Both "array" and "bytebuffer" will consume memory in a single stream up to 
the number
+    of blocks set by:
+
+        fs.s3a.multipart.size * fs.s3a.fast.upload.active.blocks.
+
+    If using either of these mechanisms, keep this value low
+
+    The total number of threads performing work across all threads is set by
+    fs.s3a.threads.max, with fs.s3a.max.total.tasks values setting the number 
of queued
+    work items.
+  </description>
+</property>
+
+<property>
+  <name>fs.s3a.fast.upload.active.blocks</name>
+  <value>4</value>
+  <description>
+    Maximum Number of blocks a single output stream can have
+    active (uploading, or queued to the central FileSystem
+    instance's pool of queued operations.
+
+    This stops a single stream overloading the shared thread pool.
+  </description>
 </property>
 
 <property>
@@ -1101,13 +1148,6 @@
 </property>
 
 <property>
-  <name>fs.s3a.fast.buffer.size</name>
-  <value>1048576</value>
-  <description>Size of initial memory buffer in bytes allocated for an
-    upload. No effect if fs.s3a.fast.upload is false.</description>
-</property>
-
-<property>
   <name>fs.s3a.user.agent.prefix</name>
   <value></value>
   <description>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c348c56/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
index 03f47c1..16bfb9a 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
@@ -965,7 +965,7 @@ public class ContractTestUtils extends Assert {
    * @return the number of megabytes/second of the recorded operation
    */
   public static double bandwidthMBs(long bytes, long durationNS) {
-    return (bytes * 1000.0) / durationNS;
+    return bytes / (1024.0 * 1024) * 1.0e9 / durationNS;
   }
 
   /**
@@ -1415,6 +1415,14 @@ public class ContractTestUtils extends Assert {
       return endTime - startTime;
     }
 
+    /**
+     * Intermediate duration of the operation.
+     * @return how much time has passed since the start (in nanos).
+     */
+    public long elapsedTime() {
+      return now() - startTime;
+    }
+
     public double bandwidth(long bytes) {
       return bandwidthMBs(bytes, duration());
     }
@@ -1422,10 +1430,12 @@ public class ContractTestUtils extends Assert {
     /**
      * Bandwidth as bytes per second.
      * @param bytes bytes in
-     * @return the number of bytes per second this operation timed.
+     * @return the number of bytes per second this operation.
+     *         0 if duration == 0.
      */
     public double bandwidthBytes(long bytes) {
-      return (bytes * 1.0) / duration();
+      double duration = duration();
+      return duration > 0 ? bytes / duration : 0;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c348c56/hadoop-tools/hadoop-aws/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/pom.xml b/hadoop-tools/hadoop-aws/pom.xml
index 1c1bb02..1f9a6ff 100644
--- a/hadoop-tools/hadoop-aws/pom.xml
+++ b/hadoop-tools/hadoop-aws/pom.xml
@@ -35,6 +35,15 @@
     <file.encoding>UTF-8</file.encoding>
     <downloadSources>true</downloadSources>
     <hadoop.tmp.dir>${project.build.directory}/test</hadoop.tmp.dir>
+
+    <!-- are scale tests enabled ? -->
+    <fs.s3a.scale.test.enabled>unset</fs.s3a.scale.test.enabled>
+    <!-- Size in MB of huge files. -->
+    <fs.s3a.scale.test.huge.filesize>unset</fs.s3a.scale.test.huge.filesize>
+    <!-- Size in MB of the partion size in huge file uploads. -->
+    
<fs.s3a.scale.test.huge.partitionsize>unset</fs.s3a.scale.test.huge.partitionsize>
+    <!-- Timeout in seconds for scale tests.-->
+    <fs.s3a.scale.test.timeout>3600</fs.s3a.scale.test.timeout>
   </properties>
 
   <profiles>
@@ -115,6 +124,11 @@
                 <!-- substitution.  Putting a prefix in front of it like -->
                 <!-- "fork-" makes it work. -->
                 
<test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id>
+                <!-- Propagate scale parameters -->
+                
<fs.s3a.scale.test.enabled>${fs.s3a.scale.test.enabled}</fs.s3a.scale.test.enabled>
+                
<fs.s3a.scale.test.huge.filesize>${fs.s3a.scale.test.huge.filesize}</fs.s3a.scale.test.huge.filesize>
+                
<fs.s3a.scale.test.huge.huge.partitionsize>${fs.s3a.scale.test.huge.partitionsize}</fs.s3a.scale.test.huge.huge.partitionsize>
+                
<fs.s3a.scale.test.timeout>${fs.s3a.scale.test.timeout}</fs.s3a.scale.test.timeout>
               </systemPropertyVariables>
             </configuration>
           </plugin>
@@ -132,7 +146,10 @@
                   <forkCount>${testsThreadCount}</forkCount>
                   <reuseForks>false</reuseForks>
                   <argLine>${maven-surefire-plugin.argLine} 
-DminiClusterDedicatedDirs=true</argLine>
+                  
<forkedProcessTimeoutInSeconds>${fs.s3a.scale.test.timeout}</forkedProcessTimeoutInSeconds>
                   <systemPropertyVariables>
+                    <!-- Tell tests that they are being executed in parallel 
-->
+                    <test.parallel.execution>true</test.parallel.execution>
                     
<test.build.data>${test.build.data}/${surefire.forkNumber}</test.build.data>
                     
<test.build.dir>${test.build.dir}/${surefire.forkNumber}</test.build.dir>
                     
<hadoop.tmp.dir>${hadoop.tmp.dir}/${surefire.forkNumber}</hadoop.tmp.dir>
@@ -142,6 +159,11 @@
                     <!-- substitution.  Putting a prefix in front of it like 
-->
                     <!-- "fork-" makes it work. -->
                     
<test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id>
+                    <!-- Propagate scale parameters -->
+                    
<fs.s3a.scale.test.enabled>${fs.s3a.scale.test.enabled}</fs.s3a.scale.test.enabled>
+                    
<fs.s3a.scale.test.huge.filesize>${fs.s3a.scale.test.huge.filesize}</fs.s3a.scale.test.huge.filesize>
+                    
<fs.s3a.scale.test.huge.huge.partitionsize>${fs.s3a.scale.test.huge.partitionsize}</fs.s3a.scale.test.huge.huge.partitionsize>
+                    
<fs.s3a.scale.test.timeout>${fs.s3a.scale.test.timeout}</fs.s3a.scale.test.timeout>
                   </systemPropertyVariables>
                   <!-- Some tests cannot run in parallel.  Tests that cover -->
                   <!-- access to the root directory must run in isolation -->
@@ -160,10 +182,11 @@
                   <excludes>
                     
<exclude>**/ITestJets3tNativeS3FileSystemContract.java</exclude>
                     <exclude>**/ITestS3ABlockingThreadPool.java</exclude>
-                    <exclude>**/ITestS3AFastOutputStream.java</exclude>
                     <exclude>**/ITestS3AFileSystemContract.java</exclude>
                     <exclude>**/ITestS3AMiniYarnCluster.java</exclude>
                     <exclude>**/ITest*Root*.java</exclude>
+                    <exclude>**/ITestS3AFileContextStatistics.java</exclude>
+                    <include>**/ITestS3AHuge*.java</include>
                   </excludes>
                 </configuration>
               </execution>
@@ -174,6 +197,16 @@
                   <goal>verify</goal>
                 </goals>
                 <configuration>
+                  
<forkedProcessTimeoutInSeconds>${fs.s3a.scale.test.timeout}</forkedProcessTimeoutInSeconds>
+                  <systemPropertyVariables>
+                    <!-- Tell tests that they are being executed sequentially 
-->
+                    <test.parallel.execution>false</test.parallel.execution>
+                    <!-- Propagate scale parameters -->
+                    
<fs.s3a.scale.test.enabled>${fs.s3a.scale.test.enabled}</fs.s3a.scale.test.enabled>
+                    
<fs.s3a.scale.test.huge.filesize>${fs.s3a.scale.test.huge.filesize}</fs.s3a.scale.test.huge.filesize>
+                    
<fs.s3a.scale.test.huge.huge.partitionsize>${fs.s3a.scale.test.huge.partitionsize}</fs.s3a.scale.test.huge.huge.partitionsize>
+                    
<fs.s3a.scale.test.timeout>${fs.s3a.scale.test.timeout}</fs.s3a.scale.test.timeout>
+                  </systemPropertyVariables>
                   <!-- Do a sequential run for tests that cannot handle -->
                   <!-- parallel execution. -->
                   <includes>
@@ -183,6 +216,8 @@
                     <include>**/ITestS3AFileSystemContract.java</include>
                     <include>**/ITestS3AMiniYarnCluster.java</include>
                     <include>**/ITest*Root*.java</include>
+                    <include>**/ITestS3AFileContextStatistics.java</include>
+                    <include>**/ITestS3AHuge*.java</include>
                   </includes>
                 </configuration>
               </execution>
@@ -210,7 +245,13 @@
                   <goal>verify</goal>
                 </goals>
                 <configuration>
-                  
<forkedProcessTimeoutInSeconds>3600</forkedProcessTimeoutInSeconds>
+                  <systemPropertyVariables>
+                    <!-- Propagate scale parameters -->
+                    
<fs.s3a.scale.test.enabled>${fs.s3a.scale.test.enabled}</fs.s3a.scale.test.enabled>
+                    
<fs.s3a.scale.test.huge.filesize>${fs.s3a.scale.test.huge.filesize}</fs.s3a.scale.test.huge.filesize>
+                    
<fs.s3a.scale.test.timeout>${fs.s3a.scale.test.timeout}</fs.s3a.scale.test.timeout>
+                  </systemPropertyVariables>
+                  
<forkedProcessTimeoutInSeconds>${fs.s3a.scale.test.timeout}</forkedProcessTimeoutInSeconds>
                 </configuration>
               </execution>
             </executions>
@@ -218,6 +259,19 @@
         </plugins>
       </build>
     </profile>
+
+    <!-- Turn on scale tests-->
+    <profile>
+      <id>scale</id>
+      <activation>
+        <property>
+          <name>scale</name>
+        </property>
+      </activation>
+      <properties >
+        <fs.s3a.scale.test.enabled>true</fs.s3a.scale.test.enabled>
+      </properties>
+    </profile>
   </profiles>
 
   <build>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c348c56/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java
index 597cce6..5ff96a5 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java
@@ -18,30 +18,21 @@
 
 package org.apache.hadoop.fs.s3a;
 
-import java.util.Collection;
-import java.util.List;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.util.concurrent.ForwardingListeningExecutorService;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 
+import org.apache.hadoop.classification.InterfaceAudience;
+
 /**
  * This ExecutorService blocks the submission of new tasks when its queue is
  * already full by using a semaphore. Task submissions require permits, task
@@ -50,17 +41,17 @@ import com.google.common.util.concurrent.MoreExecutors;
  * This is inspired by <a 
href="https://github.com/apache/incubator-s4/blob/master/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/BlockingThreadPoolExecutorService.java";>
  * this s4 threadpool</a>
  */
-public class BlockingThreadPoolExecutorService
-    extends ForwardingListeningExecutorService {
+@InterfaceAudience.Private
+final class BlockingThreadPoolExecutorService
+    extends SemaphoredDelegatingExecutor {
 
   private static Logger LOG = LoggerFactory
       .getLogger(BlockingThreadPoolExecutorService.class);
 
-  private Semaphore queueingPermits;
-  private ListeningExecutorService executorDelegatee;
-
   private static final AtomicInteger POOLNUMBER = new AtomicInteger(1);
 
+  private final ThreadPoolExecutor eventProcessingExecutor;
+
   /**
    * Returns a {@link java.util.concurrent.ThreadFactory} that names each
    * created thread uniquely,
@@ -69,7 +60,7 @@ public class BlockingThreadPoolExecutorService
    * @param prefix The prefix of every created Thread's name
    * @return a {@link java.util.concurrent.ThreadFactory} that names threads
    */
-  public static ThreadFactory getNamedThreadFactory(final String prefix) {
+  static ThreadFactory getNamedThreadFactory(final String prefix) {
     SecurityManager s = System.getSecurityManager();
     final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() :
         Thread.currentThread().getThreadGroup();
@@ -113,6 +104,12 @@ public class BlockingThreadPoolExecutorService
     };
   }
 
+  private BlockingThreadPoolExecutorService(int permitCount,
+      ThreadPoolExecutor eventProcessingExecutor) {
+    super(MoreExecutors.listeningDecorator(eventProcessingExecutor),
+        permitCount, false);
+    this.eventProcessingExecutor = eventProcessingExecutor;
+  }
 
   /**
    * A thread pool that that blocks clients submitting additional tasks if
@@ -125,10 +122,12 @@ public class BlockingThreadPoolExecutorService
    * @param unit time unit
    * @param prefixName prefix of name for threads
    */
-  public BlockingThreadPoolExecutorService(int activeTasks, int waitingTasks,
-      long keepAliveTime, TimeUnit unit, String prefixName) {
-    super();
-    queueingPermits = new Semaphore(waitingTasks + activeTasks, false);
+  public static BlockingThreadPoolExecutorService newInstance(
+      int activeTasks,
+      int waitingTasks,
+      long keepAliveTime, TimeUnit unit,
+      String prefixName) {
+
     /* Although we generally only expect up to waitingTasks tasks in the
     queue, we need to be able to buffer all tasks in case dequeueing is
     slower than enqueueing. */
@@ -147,126 +146,25 @@ public class BlockingThreadPoolExecutorService
               }
             });
     eventProcessingExecutor.allowCoreThreadTimeOut(true);
-    executorDelegatee =
-        MoreExecutors.listeningDecorator(eventProcessingExecutor);
-
-  }
-
-  @Override
-  protected ListeningExecutorService delegate() {
-    return executorDelegatee;
-  }
-
-  @Override
-  public <T> ListenableFuture<T> submit(Callable<T> task) {
-    try {
-      queueingPermits.acquire();
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      return Futures.immediateFailedCheckedFuture(e);
-    }
-    return super.submit(new CallableWithPermitRelease<T>(task));
-  }
-
-  @Override
-  public <T> ListenableFuture<T> submit(Runnable task, T result) {
-    try {
-      queueingPermits.acquire();
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      return Futures.immediateFailedCheckedFuture(e);
-    }
-    return super.submit(new RunnableWithPermitRelease(task), result);
-  }
-
-  @Override
-  public ListenableFuture<?> submit(Runnable task) {
-    try {
-      queueingPermits.acquire();
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      return Futures.immediateFailedCheckedFuture(e);
-    }
-    return super.submit(new RunnableWithPermitRelease(task));
-  }
-
-  @Override
-  public void execute(Runnable command) {
-    try {
-      queueingPermits.acquire();
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-    }
-    super.execute(new RunnableWithPermitRelease(command));
+    return new BlockingThreadPoolExecutorService(waitingTasks + activeTasks,
+        eventProcessingExecutor);
   }
 
   /**
-   * Releases a permit after the task is executed.
+   * Get the actual number of active threads.
+   * @return the active thread count
    */
-  class RunnableWithPermitRelease implements Runnable {
-
-    private Runnable delegatee;
-
-    public RunnableWithPermitRelease(Runnable delegatee) {
-      this.delegatee = delegatee;
-    }
-
-    @Override
-    public void run() {
-      try {
-        delegatee.run();
-      } finally {
-        queueingPermits.release();
-      }
-
-    }
-  }
-
-  /**
-   * Releases a permit after the task is completed.
-   */
-  class CallableWithPermitRelease<T> implements Callable<T> {
-
-    private Callable<T> delegatee;
-
-    public CallableWithPermitRelease(Callable<T> delegatee) {
-      this.delegatee = delegatee;
-    }
-
-    @Override
-    public T call() throws Exception {
-      try {
-        return delegatee.call();
-      } finally {
-        queueingPermits.release();
-      }
-    }
-
-  }
-
-  @Override
-  public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
-      throws InterruptedException {
-    throw new RuntimeException("Not implemented");
+  int getActiveCount() {
+    return eventProcessingExecutor.getActiveCount();
   }
 
   @Override
-  public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
-      long timeout, TimeUnit unit) throws InterruptedException {
-    throw new RuntimeException("Not implemented");
+  public String toString() {
+    final StringBuilder sb = new StringBuilder(
+        "BlockingThreadPoolExecutorService{");
+    sb.append(super.toString());
+    sb.append(", activeCount=").append(getActiveCount());
+    sb.append('}');
+    return sb.toString();
   }
-
-  @Override
-  public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
-      throws InterruptedException, ExecutionException {
-    throw new RuntimeException("Not implemented");
-  }
-
-  @Override
-  public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout,
-      TimeUnit unit)
-      throws InterruptedException, ExecutionException, TimeoutException {
-    throw new RuntimeException("Not implemented");
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c348c56/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index 64fd8e5..65df0bf 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -35,6 +35,9 @@ public final class Constants {
   private Constants() {
   }
 
+  /** The minimum multipart size which S3 supports. */
+  public static final int MULTIPART_MIN_SIZE = 5 * 1024 * 1024;
+
   // s3 access key
   public static final String ACCESS_KEY = "fs.s3a.access.key";
 
@@ -124,14 +127,72 @@ public final class Constants {
   // comma separated list of directories
   public static final String BUFFER_DIR = "fs.s3a.buffer.dir";
 
-  // should we upload directly from memory rather than using a file buffer
+  // switch to the fast block-by-block upload mechanism
   public static final String FAST_UPLOAD = "fs.s3a.fast.upload";
   public static final boolean DEFAULT_FAST_UPLOAD = false;
 
   //initial size of memory buffer for a fast upload
+  @Deprecated
   public static final String FAST_BUFFER_SIZE = "fs.s3a.fast.buffer.size";
   public static final int DEFAULT_FAST_BUFFER_SIZE = 1048576; //1MB
 
+  /**
+   * What buffer to use.
+   * Default is {@link #FAST_UPLOAD_BUFFER_DISK}
+   * Value: {@value}
+   */
+  @InterfaceStability.Unstable
+  public static final String FAST_UPLOAD_BUFFER =
+      "fs.s3a.fast.upload.buffer";
+
+  /**
+   * Buffer blocks to disk: {@value}.
+   * Capacity is limited to available disk space.
+   */
+
+  @InterfaceStability.Unstable
+  public static final String FAST_UPLOAD_BUFFER_DISK = "disk";
+
+  /**
+   * Use an in-memory array. Fast but will run of heap rapidly: {@value}.
+   */
+  @InterfaceStability.Unstable
+  public static final String FAST_UPLOAD_BUFFER_ARRAY = "array";
+
+  /**
+   * Use a byte buffer. May be more memory efficient than the
+   * {@link #FAST_UPLOAD_BUFFER_ARRAY}: {@value}.
+   */
+  @InterfaceStability.Unstable
+  public static final String FAST_UPLOAD_BYTEBUFFER = "bytebuffer";
+
+  /**
+   * Default buffer option: {@value}.
+   */
+  @InterfaceStability.Unstable
+  public static final String DEFAULT_FAST_UPLOAD_BUFFER =
+      FAST_UPLOAD_BUFFER_DISK;
+
+  /**
+   * Maximum Number of blocks a single output stream can have
+   * active (uploading, or queued to the central FileSystem
+   * instance's pool of queued operations.
+   * This stops a single stream overloading the shared thread pool.
+   * {@value}
+   * <p>
+   * Default is {@link #DEFAULT_FAST_UPLOAD_ACTIVE_BLOCKS}
+   */
+  @InterfaceStability.Unstable
+  public static final String FAST_UPLOAD_ACTIVE_BLOCKS =
+      "fs.s3a.fast.upload.active.blocks";
+
+  /**
+   * Limit of queued block upload operations before writes
+   * block. Value: {@value}
+   */
+  @InterfaceStability.Unstable
+  public static final int DEFAULT_FAST_UPLOAD_ACTIVE_BLOCKS = 4;
+
   // Private | PublicRead | PublicReadWrite | AuthenticatedRead |
   // LogDeliveryWrite | BucketOwnerRead | BucketOwnerFullControl
   public static final String CANNED_ACL = "fs.s3a.acl.default";
@@ -145,7 +206,7 @@ public final class Constants {
   // purge any multipart uploads older than this number of seconds
   public static final String PURGE_EXISTING_MULTIPART_AGE =
       "fs.s3a.multipart.purge.age";
-  public static final long DEFAULT_PURGE_EXISTING_MULTIPART_AGE = 14400;
+  public static final long DEFAULT_PURGE_EXISTING_MULTIPART_AGE = 86400;
 
   // s3 server-side encryption
   public static final String SERVER_SIDE_ENCRYPTION_ALGORITHM =
@@ -215,4 +276,10 @@ public final class Constants {
   public static final Class<? extends S3ClientFactory>
       DEFAULT_S3_CLIENT_FACTORY_IMPL =
           S3ClientFactory.DefaultS3ClientFactory.class;
+
+  /**
+   * Maximum number of partitions in a multipart upload: {@value}.
+   */
+  @InterfaceAudience.Private
+  public static final int MAX_MULTIPART_COUNT = 10000;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c348c56/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
new file mode 100644
index 0000000..b66a23f
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
@@ -0,0 +1,703 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.event.ProgressEvent;
+import com.amazonaws.event.ProgressEventType;
+import com.amazonaws.event.ProgressListener;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
+import com.amazonaws.services.s3.model.PartETag;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.PutObjectResult;
+import com.amazonaws.services.s3.model.UploadPartRequest;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.util.Progressable;
+
+import static org.apache.hadoop.fs.s3a.S3AUtils.*;
+import static org.apache.hadoop.fs.s3a.Statistic.*;
+
+/**
+ * Upload files/parts directly via different buffering mechanisms:
+ * including memory and disk.
+ *
+ * If the stream is closed and no update has started, then the upload
+ * is instead done as a single PUT operation.
+ *
+ * Unstable: statistics and error handling might evolve.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+class S3ABlockOutputStream extends OutputStream {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(S3ABlockOutputStream.class);
+
+  /** Owner FileSystem. */
+  private final S3AFileSystem fs;
+
+  /** Object being uploaded. */
+  private final String key;
+
+  /** Size of all blocks. */
+  private final int blockSize;
+
+  /** Callback for progress. */
+  private final ProgressListener progressListener;
+  private final ListeningExecutorService executorService;
+
+  /**
+   * Retry policy for multipart commits; not all AWS SDK versions retry that.
+   */
+  private final RetryPolicy retryPolicy =
+      RetryPolicies.retryUpToMaximumCountWithProportionalSleep(
+          5,
+          2000,
+          TimeUnit.MILLISECONDS);
+  /**
+   * Factory for blocks.
+   */
+  private final S3ADataBlocks.BlockFactory blockFactory;
+
+  /** Preallocated byte buffer for writing single characters. */
+  private final byte[] singleCharWrite = new byte[1];
+
+  /** Multipart upload details; null means none started. */
+  private MultiPartUpload multiPartUpload;
+
+  /** Closed flag. */
+  private final AtomicBoolean closed = new AtomicBoolean(false);
+
+  /** Current data block. Null means none currently active */
+  private S3ADataBlocks.DataBlock activeBlock;
+
+  /** Count of blocks uploaded. */
+  private long blockCount = 0;
+
+  /** Statistics to build up. */
+  private final S3AInstrumentation.OutputStreamStatistics statistics;
+
+  /**
+   * Write operation helper; encapsulation of the filesystem operations.
+   */
+  private final S3AFileSystem.WriteOperationHelper writeOperationHelper;
+
+  /**
+   * An S3A output stream which uploads partitions in a separate pool of
+   * threads; different {@link S3ADataBlocks.BlockFactory}
+   * instances can control where data is buffered.
+   *
+   * @param fs S3AFilesystem
+   * @param key S3 object to work on.
+   * @param executorService the executor service to use to schedule work
+   * @param progress report progress in order to prevent timeouts. If
+   * this object implements {@code ProgressListener} then it will be
+   * directly wired up to the AWS client, so receive detailed progress
+   * information.
+   * @param blockSize size of a single block.
+   * @param blockFactory factory for creating stream destinations
+   * @param statistics stats for this stream
+   * @param writeOperationHelper state of the write operation.
+   * @throws IOException on any problem
+   */
+  S3ABlockOutputStream(S3AFileSystem fs,
+      String key,
+      ExecutorService executorService,
+      Progressable progress,
+      long blockSize,
+      S3ADataBlocks.BlockFactory blockFactory,
+      S3AInstrumentation.OutputStreamStatistics statistics,
+      S3AFileSystem.WriteOperationHelper writeOperationHelper)
+      throws IOException {
+    this.fs = fs;
+    this.key = key;
+    this.blockFactory = blockFactory;
+    this.blockSize = (int) blockSize;
+    this.statistics = statistics;
+    this.writeOperationHelper = writeOperationHelper;
+    Preconditions.checkArgument(blockSize >= Constants.MULTIPART_MIN_SIZE,
+        "Block size is too small: %d", blockSize);
+    this.executorService = MoreExecutors.listeningDecorator(executorService);
+    this.multiPartUpload = null;
+    this.progressListener = (progress instanceof ProgressListener) ?
+        (ProgressListener) progress
+        : new ProgressableListener(progress);
+    // create that first block. This guarantees that an open + close sequence
+    // writes a 0-byte entry.
+    createBlockIfNeeded();
+    LOG.debug("Initialized S3ABlockOutputStream for {}" +
+        " output to {}", writeOperationHelper, activeBlock);
+  }
+
+  /**
+   * Demand create a destination block.
+   * @return the active block; null if there isn't one.
+   * @throws IOException on any failure to create
+   */
+  private synchronized S3ADataBlocks.DataBlock createBlockIfNeeded()
+      throws IOException {
+    if (activeBlock == null) {
+      blockCount++;
+      if (blockCount>= Constants.MAX_MULTIPART_COUNT) {
+        LOG.error("Number of partitions in stream exceeds limit for S3: " +
+             + Constants.MAX_MULTIPART_COUNT +  " write may fail.");
+      }
+      activeBlock = blockFactory.create(this.blockSize);
+    }
+    return activeBlock;
+  }
+
+  /**
+   * Synchronized accessor to the active block.
+   * @return the active block; null if there isn't one.
+   */
+  private synchronized S3ADataBlocks.DataBlock getActiveBlock() {
+    return activeBlock;
+  }
+
+  /**
+   * Predicate to query whether or not there is an active block.
+   * @return true if there is an active block.
+   */
+  private synchronized boolean hasActiveBlock() {
+    return activeBlock != null;
+  }
+
+  /**
+   * Clear the active block.
+   */
+  private void clearActiveBlock() {
+    LOG.debug("Clearing active block");
+    synchronized (this) {
+      activeBlock = null;
+    }
+  }
+
+  /**
+   * Check for the filesystem being open.
+   * @throws IOException if the filesystem is closed.
+   */
+  void checkOpen() throws IOException {
+    if (closed.get()) {
+      throw new IOException("Filesystem " + writeOperationHelper + " closed");
+    }
+  }
+
+  /**
+   * The flush operation does not trigger an upload; that awaits
+   * the next block being full. What it does do is call {@code flush() }
+   * on the current block, leaving it to choose how to react.
+   * @throws IOException Any IO problem.
+   */
+  @Override
+  public synchronized void flush() throws IOException {
+    checkOpen();
+    S3ADataBlocks.DataBlock dataBlock = getActiveBlock();
+    if (dataBlock != null) {
+      dataBlock.flush();
+    }
+  }
+
+  /**
+   * Writes a byte to the destination. If this causes the buffer to reach
+   * its limit, the actual upload is submitted to the threadpool.
+   * @param b the int of which the lowest byte is written
+   * @throws IOException on any problem
+   */
+  @Override
+  public synchronized void write(int b) throws IOException {
+    singleCharWrite[0] = (byte)b;
+    write(singleCharWrite, 0, 1);
+  }
+
+  /**
+   * Writes a range of bytes from to the memory buffer. If this causes the
+   * buffer to reach its limit, the actual upload is submitted to the
+   * threadpool and the remainder of the array is written to memory
+   * (recursively).
+   * @param source byte array containing
+   * @param offset offset in array where to start
+   * @param len number of bytes to be written
+   * @throws IOException on any problem
+   */
+  @Override
+  public synchronized void write(byte[] source, int offset, int len)
+      throws IOException {
+
+    S3ADataBlocks.validateWriteArgs(source, offset, len);
+    checkOpen();
+    if (len == 0) {
+      return;
+    }
+    S3ADataBlocks.DataBlock block = createBlockIfNeeded();
+    int written = block.write(source, offset, len);
+    int remainingCapacity = block.remainingCapacity();
+    if (written < len) {
+      // not everything was written —the block has run out
+      // of capacity
+      // Trigger an upload then process the remainder.
+      LOG.debug("writing more data than block has capacity -triggering 
upload");
+      uploadCurrentBlock();
+      // tail recursion is mildly expensive, but given buffer sizes must be MB.
+      // it's unlikely to recurse very deeply.
+      this.write(source, offset + written, len - written);
+    } else {
+      if (remainingCapacity == 0) {
+        // the whole buffer is done, trigger an upload
+        uploadCurrentBlock();
+      }
+    }
+  }
+
+  /**
+   * Start an asynchronous upload of the current block.
+   * @throws IOException Problems opening the destination for upload
+   * or initializing the upload.
+   */
+  private synchronized void uploadCurrentBlock() throws IOException {
+    Preconditions.checkState(hasActiveBlock(), "No active block");
+    LOG.debug("Writing block # {}", blockCount);
+    if (multiPartUpload == null) {
+      LOG.debug("Initiating Multipart upload");
+      multiPartUpload = new MultiPartUpload();
+    }
+    try {
+      multiPartUpload.uploadBlockAsync(getActiveBlock());
+    } finally {
+      // set the block to null, so the next write will create a new block.
+      clearActiveBlock();
+    }
+  }
+
+  /**
+   * Close the stream.
+   *
+   * This will not return until the upload is complete
+   * or the attempt to perform the upload has failed.
+   * Exceptions raised in this method are indicative that the write has
+   * failed and data is at risk of being lost.
+   * @throws IOException on any failure.
+   */
+  @Override
+  public void close() throws IOException {
+    if (closed.getAndSet(true)) {
+      // already closed
+      LOG.debug("Ignoring close() as stream is already closed");
+      return;
+    }
+    S3ADataBlocks.DataBlock block = getActiveBlock();
+    boolean hasBlock = hasActiveBlock();
+    LOG.debug("{}: Closing block #{}: current block= {}",
+        this,
+        blockCount,
+        hasBlock ? block : "(none)");
+    try {
+      if (multiPartUpload == null) {
+        if (hasBlock) {
+          // no uploads of data have taken place, put the single block up.
+          // This must happen even if there is no data, so that 0 byte files
+          // are created.
+          putObject();
+        }
+      } else {
+        // there has already been at least one block scheduled for upload;
+        // put up the current then wait
+        if (hasBlock && block.hasData()) {
+          //send last part
+          uploadCurrentBlock();
+        }
+        // wait for the partial uploads to finish
+        final List<PartETag> partETags =
+            multiPartUpload.waitForAllPartUploads();
+        // then complete the operation
+        multiPartUpload.complete(partETags);
+      }
+      LOG.debug("Upload complete for {}", writeOperationHelper);
+    } catch (IOException ioe) {
+      writeOperationHelper.writeFailed(ioe);
+      throw ioe;
+    } finally {
+      LOG.debug("Closing block and factory");
+      IOUtils.closeStream(block);
+      IOUtils.closeStream(blockFactory);
+      LOG.debug("Statistics: {}", statistics);
+      IOUtils.closeStream(statistics);
+      clearActiveBlock();
+    }
+    // All end of write operations, including deleting fake parent directories
+    writeOperationHelper.writeSuccessful();
+  }
+
+  /**
+   * Upload the current block as a single PUT request; if the buffer
+   * is empty a 0-byte PUT will be invoked, as it is needed to create an
+   * entry at the far end.
+   * @throws IOException any problem.
+   */
+  private void putObject() throws IOException {
+    LOG.debug("Executing regular upload for {}", writeOperationHelper);
+
+    final S3ADataBlocks.DataBlock block = getActiveBlock();
+    int size = block.dataSize();
+    final PutObjectRequest putObjectRequest =
+        writeOperationHelper.newPutRequest(
+            block.startUpload(),
+            size);
+    long transferQueueTime = now();
+    BlockUploadProgress callback =
+        new BlockUploadProgress(
+            block, progressListener, transferQueueTime);
+    putObjectRequest.setGeneralProgressListener(callback);
+    statistics.blockUploadQueued(size);
+    ListenableFuture<PutObjectResult> putObjectResult =
+        executorService.submit(new Callable<PutObjectResult>() {
+          @Override
+          public PutObjectResult call() throws Exception {
+            PutObjectResult result = fs.putObjectDirect(putObjectRequest);
+            block.close();
+            return result;
+          }
+        });
+    clearActiveBlock();
+    //wait for completion
+    try {
+      putObjectResult.get();
+    } catch (InterruptedException ie) {
+      LOG.warn("Interrupted object upload", ie);
+      Thread.currentThread().interrupt();
+    } catch (ExecutionException ee) {
+      throw extractException("regular upload", key, ee);
+    }
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder(
+        "S3ABlockOutputStream{");
+    sb.append(writeOperationHelper.toString());
+    sb.append(", blockSize=").append(blockSize);
+    // unsynced access; risks consistency in exchange for no risk of deadlock.
+    S3ADataBlocks.DataBlock block = activeBlock;
+    if (block != null) {
+      sb.append(", activeBlock=").append(block);
+    }
+    sb.append('}');
+    return sb.toString();
+  }
+
+  private void incrementWriteOperations() {
+    fs.incrementWriteOperations();
+  }
+
+  /**
+   * Current time in milliseconds.
+   * @return time
+   */
+  private long now() {
+    return System.currentTimeMillis();
+  }
+
+  /**
+   * Multiple partition upload.
+   */
+  private class MultiPartUpload {
+    private final String uploadId;
+    private final List<ListenableFuture<PartETag>> partETagsFutures;
+
+    public MultiPartUpload() throws IOException {
+      this.uploadId = writeOperationHelper.initiateMultiPartUpload();
+      this.partETagsFutures = new ArrayList<>(2);
+      LOG.debug("Initiated multi-part upload for {} with " +
+          "id '{}'", writeOperationHelper, uploadId);
+    }
+
+    /**
+     * Upload a block of data.
+     * This will take the block
+     * @param block block to upload
+     * @throws IOException upload failure
+     */
+    private void uploadBlockAsync(final S3ADataBlocks.DataBlock block)
+        throws IOException {
+      LOG.debug("Queueing upload of {}", block);
+      final int size = block.dataSize();
+      final InputStream uploadStream = block.startUpload();
+      final int currentPartNumber = partETagsFutures.size() + 1;
+      final UploadPartRequest request =
+          writeOperationHelper.newUploadPartRequest(
+              uploadId,
+              uploadStream,
+              currentPartNumber,
+              size);
+      long transferQueueTime = now();
+      BlockUploadProgress callback =
+          new BlockUploadProgress(
+              block, progressListener, transferQueueTime);
+      request.setGeneralProgressListener(callback);
+      statistics.blockUploadQueued(block.dataSize());
+      ListenableFuture<PartETag> partETagFuture =
+          executorService.submit(new Callable<PartETag>() {
+            @Override
+            public PartETag call() throws Exception {
+              // this is the queued upload operation
+              LOG.debug("Uploading part {} for id '{}'", currentPartNumber,
+                  uploadId);
+              // do the upload
+              PartETag partETag = fs.uploadPart(request).getPartETag();
+              LOG.debug("Completed upload of {}", block);
+              LOG.debug("Stream statistics of {}", statistics);
+
+              // close the block
+              block.close();
+              return partETag;
+            }
+          });
+      partETagsFutures.add(partETagFuture);
+    }
+
+    /**
+     * Block awaiting all outstanding uploads to complete.
+     * @return list of results
+     * @throws IOException IO Problems
+     */
+    private List<PartETag> waitForAllPartUploads() throws IOException {
+      LOG.debug("Waiting for {} uploads to complete", partETagsFutures.size());
+      try {
+        return Futures.allAsList(partETagsFutures).get();
+      } catch (InterruptedException ie) {
+        LOG.warn("Interrupted partUpload", ie);
+        Thread.currentThread().interrupt();
+        return null;
+      } catch (ExecutionException ee) {
+        //there is no way of recovering so abort
+        //cancel all partUploads
+        LOG.debug("While waiting for upload completion", ee);
+        LOG.debug("Cancelling futures");
+        for (ListenableFuture<PartETag> future : partETagsFutures) {
+          future.cancel(true);
+        }
+        //abort multipartupload
+        this.abort();
+        throw extractException("Multi-part upload with id '" + uploadId
+                + "' to " + key, key, ee);
+      }
+    }
+
+    /**
+     * This completes a multipart upload.
+     * Sometimes it fails; here retries are handled to avoid losing all data
+     * on a transient failure.
+     * @param partETags list of partial uploads
+     * @throws IOException on any problem
+     */
+    private CompleteMultipartUploadResult complete(List<PartETag> partETags)
+        throws IOException {
+      int retryCount = 0;
+      AmazonClientException lastException;
+      String operation =
+          String.format("Completing multi-part upload for key '%s'," +
+                  " id '%s' with %s partitions ",
+              key, uploadId, partETags.size());
+      do {
+        try {
+          LOG.debug(operation);
+          return writeOperationHelper.completeMultipartUpload(
+                  uploadId,
+                  partETags);
+        } catch (AmazonClientException e) {
+          lastException = e;
+          statistics.exceptionInMultipartComplete();
+        }
+      } while (shouldRetry(operation, lastException, retryCount++));
+      // this point is only reached if the operation failed more than
+      // the allowed retry count
+      throw translateException(operation, key, lastException);
+    }
+
+    /**
+     * Abort a multi-part upload. Retries are attempted on failures.
+     * IOExceptions are caught; this is expected to be run as a cleanup 
process.
+     */
+    public void abort() {
+      int retryCount = 0;
+      AmazonClientException lastException;
+      fs.incrementStatistic(OBJECT_MULTIPART_UPLOAD_ABORTED);
+      String operation =
+          String.format("Aborting multi-part upload for '%s', id '%s",
+              writeOperationHelper, uploadId);
+      do {
+        try {
+          LOG.debug(operation);
+          writeOperationHelper.abortMultipartUpload(uploadId);
+          return;
+        } catch (AmazonClientException e) {
+          lastException = e;
+          statistics.exceptionInMultipartAbort();
+        }
+      } while (shouldRetry(operation, lastException, retryCount++));
+      // this point is only reached if the operation failed more than
+      // the allowed retry count
+      LOG.warn("Unable to abort multipart upload, you may need to purge  " +
+          "uploaded parts", lastException);
+    }
+
+    /**
+     * Predicate to determine whether a failed operation should
+     * be attempted again.
+     * If a retry is advised, the exception is automatically logged and
+     * the filesystem statistic {@link Statistic#IGNORED_ERRORS} incremented.
+     * The method then sleeps for the sleep time suggested by the sleep policy;
+     * if the sleep is interrupted then {@code Thread.interrupted()} is set
+     * to indicate the thread was interrupted; then false is returned.
+     *
+     * @param operation operation for log message
+     * @param e exception raised.
+     * @param retryCount  number of retries already attempted
+     * @return true if another attempt should be made
+     */
+    private boolean shouldRetry(String operation,
+        AmazonClientException e,
+        int retryCount) {
+      try {
+        RetryPolicy.RetryAction retryAction =
+            retryPolicy.shouldRetry(e, retryCount, 0, true);
+        boolean retry = retryAction == RetryPolicy.RetryAction.RETRY;
+        if (retry) {
+          fs.incrementStatistic(IGNORED_ERRORS);
+          LOG.info("Retrying {} after exception ", operation, e);
+          Thread.sleep(retryAction.delayMillis);
+        }
+        return retry;
+      } catch (InterruptedException ex) {
+        Thread.currentThread().interrupt();
+        return false;
+      } catch (Exception ignored) {
+        return false;
+      }
+    }
+
+  }
+
+  /**
+   * The upload progress listener registered for events returned
+   * during the upload of a single block.
+   * It updates statistics and handles the end of the upload.
+   * Transfer failures are logged at WARN.
+   */
+  private final class BlockUploadProgress implements ProgressListener {
+    private final S3ADataBlocks.DataBlock block;
+    private final ProgressListener nextListener;
+    private final long transferQueueTime;
+    private long transferStartTime;
+
+    /**
+     * Track the progress of a single block upload.
+     * @param block block to monitor
+     * @param nextListener optional next progress listener
+     * @param transferQueueTime time the block was transferred
+     * into the queue
+     */
+    private BlockUploadProgress(S3ADataBlocks.DataBlock block,
+        ProgressListener nextListener,
+        long transferQueueTime) {
+      this.block = block;
+      this.transferQueueTime = transferQueueTime;
+      this.nextListener = nextListener;
+    }
+
+    @Override
+    public void progressChanged(ProgressEvent progressEvent) {
+      ProgressEventType eventType = progressEvent.getEventType();
+      long bytesTransferred = progressEvent.getBytesTransferred();
+
+      int size = block.dataSize();
+      switch (eventType) {
+
+      case REQUEST_BYTE_TRANSFER_EVENT:
+        // bytes uploaded
+        statistics.bytesTransferred(bytesTransferred);
+        break;
+
+      case TRANSFER_PART_STARTED_EVENT:
+        transferStartTime = now();
+        statistics.blockUploadStarted(transferStartTime - transferQueueTime,
+            size);
+        incrementWriteOperations();
+        break;
+
+      case TRANSFER_PART_COMPLETED_EVENT:
+        statistics.blockUploadCompleted(now() - transferStartTime, size);
+        break;
+
+      case TRANSFER_PART_FAILED_EVENT:
+        statistics.blockUploadFailed(now() - transferStartTime, size);
+        LOG.warn("Transfer failure of block {}", block);
+        break;
+
+      default:
+        // nothing
+      }
+
+      if (nextListener != null) {
+        nextListener.progressChanged(progressEvent);
+      }
+    }
+  }
+
+  /**
+   * Bridge from AWS {@code ProgressListener} to Hadoop {@link Progressable}.
+   */
+  private static class ProgressableListener implements ProgressListener {
+    private final Progressable progress;
+
+    public ProgressableListener(Progressable progress) {
+      this.progress = progress;
+    }
+
+    public void progressChanged(ProgressEvent progressEvent) {
+      if (progress != null) {
+        progress.progress();
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c348c56/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java
new file mode 100644
index 0000000..0fe2af7
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java
@@ -0,0 +1,821 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.util.DirectBufferPool;
+
+import static org.apache.hadoop.fs.s3a.S3ADataBlocks.DataBlock.DestState.*;
+
+/**
+ * Set of classes to support output streaming into blocks which are then
+ * uploaded as partitions.
+ */
+final class S3ADataBlocks {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(S3ADataBlocks.class);
+
+  private S3ADataBlocks() {
+  }
+
+  /**
+   * Validate args to a write command. These are the same validation checks
+   * expected for any implementation of {@code OutputStream.write()}.
+   * @param b byte array containing data
+   * @param off offset in array where to start
+   * @param len number of bytes to be written
+   * @throws NullPointerException for a null buffer
+   * @throws IndexOutOfBoundsException if indices are out of range
+   */
+  static void validateWriteArgs(byte[] b, int off, int len)
+      throws IOException {
+    Preconditions.checkNotNull(b);
+    if ((off < 0) || (off > b.length) || (len < 0) ||
+        ((off + len) > b.length) || ((off + len) < 0)) {
+      throw new IndexOutOfBoundsException(
+          "write (b[" + b.length + "], " + off + ", " + len + ')');
+    }
+  }
+
+  /**
+   * Create a factory.
+   * @param owner factory owner
+   * @param name factory name -the option from {@link Constants}.
+   * @return the factory, ready to be initialized.
+   * @throws IllegalArgumentException if the name is unknown.
+   */
+  static BlockFactory createFactory(S3AFileSystem owner,
+      String name) {
+    switch (name) {
+    case Constants.FAST_UPLOAD_BUFFER_ARRAY:
+      return new ArrayBlockFactory(owner);
+    case Constants.FAST_UPLOAD_BUFFER_DISK:
+      return new DiskBlockFactory(owner);
+    case Constants.FAST_UPLOAD_BYTEBUFFER:
+      return new ByteBufferBlockFactory(owner);
+    default:
+      throw new IllegalArgumentException("Unsupported block buffer" +
+          " \"" + name + '"');
+    }
+  }
+
+  /**
+   * Base class for block factories.
+   */
+  static abstract class BlockFactory implements Closeable {
+
+    private final S3AFileSystem owner;
+
+    protected BlockFactory(S3AFileSystem owner) {
+      this.owner = owner;
+    }
+
+
+    /**
+     * Create a block.
+     * @param limit limit of the block.
+     * @return a new block.
+     */
+    abstract DataBlock create(int limit) throws IOException;
+
+    /**
+     * Implement any close/cleanup operation.
+     * Base class is a no-op
+     * @throws IOException -ideally, it shouldn't.
+     */
+    @Override
+    public void close() throws IOException {
+    }
+
+    /**
+     * Owner.
+     */
+    protected S3AFileSystem getOwner() {
+      return owner;
+    }
+  }
+
+  /**
+   * This represents a block being uploaded.
+   */
+  static abstract class DataBlock implements Closeable {
+
+    enum DestState {Writing, Upload, Closed}
+
+    private volatile DestState state = Writing;
+
+    /**
+     * Atomically enter a state, verifying current state.
+     * @param current current state. null means "no check"
+     * @param next next state
+     * @throws IllegalStateException if the current state is not as expected
+     */
+    protected synchronized final void enterState(DestState current,
+        DestState next)
+        throws IllegalStateException {
+      verifyState(current);
+      LOG.debug("{}: entering state {}", this, next);
+      state = next;
+    }
+
+    /**
+     * Verify that the block is in the declared state.
+     * @param expected expected state.
+     * @throws IllegalStateException if the DataBlock is in the wrong state
+     */
+    protected final void verifyState(DestState expected)
+        throws IllegalStateException {
+      if (expected != null && state != expected) {
+        throw new IllegalStateException("Expected stream state " + expected
+            + " -but actual state is " + state + " in " + this);
+      }
+    }
+
+    /**
+     * Current state.
+     * @return the current state.
+     */
+    final DestState getState() {
+      return state;
+    }
+
+    /**
+     * Return the current data size.
+     * @return the size of the data
+     */
+    abstract int dataSize();
+
+    /**
+     * Predicate to verify that the block has the capacity to write
+     * the given set of bytes.
+     * @param bytes number of bytes desired to be written.
+     * @return true if there is enough space.
+     */
+    abstract boolean hasCapacity(long bytes);
+
+    /**
+     * Predicate to check if there is data in the block.
+     * @return true if there is
+     */
+    boolean hasData() {
+      return dataSize() > 0;
+    }
+
+    /**
+     * The remaining capacity in the block before it is full.
+     * @return the number of bytes remaining.
+     */
+    abstract int remainingCapacity();
+
+    /**
+     * Write a series of bytes from the buffer, from the offset.
+     * Returns the number of bytes written.
+     * Only valid in the state {@code Writing}.
+     * Base class verifies the state but does no writing.
+     * @param buffer buffer
+     * @param offset offset
+     * @param length length of write
+     * @return number of bytes written
+     * @throws IOException trouble
+     */
+    int write(byte[] buffer, int offset, int length) throws IOException {
+      verifyState(Writing);
+      Preconditions.checkArgument(buffer != null, "Null buffer");
+      Preconditions.checkArgument(length >= 0, "length is negative");
+      Preconditions.checkArgument(offset >= 0, "offset is negative");
+      Preconditions.checkArgument(
+          !(buffer.length - offset < length),
+          "buffer shorter than amount of data to write");
+      return 0;
+    }
+
+    /**
+     * Flush the output.
+     * Only valid in the state {@code Writing}.
+     * In the base class, this is a no-op
+     * @throws IOException any IO problem.
+     */
+    void flush() throws IOException {
+      verifyState(Writing);
+    }
+
+    /**
+     * Switch to the upload state and return a stream for uploading.
+     * Base class calls {@link #enterState(DestState, DestState)} to
+     * manage the state machine.
+     * @return the stream
+     * @throws IOException trouble
+     */
+    InputStream startUpload() throws IOException {
+      LOG.debug("Start datablock upload");
+      enterState(Writing, Upload);
+      return null;
+    }
+
+    /**
+     * Enter the closed state.
+     * @return true if the class was in any other state, implying that
+     * the subclass should do its close operations
+     */
+    protected synchronized boolean enterClosedState() {
+      if (!state.equals(Closed)) {
+        enterState(null, Closed);
+        return true;
+      } else {
+        return false;
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (enterClosedState()) {
+        LOG.debug("Closed {}", this);
+        innerClose();
+      }
+    }
+
+    /**
+     * Inner close logic for subclasses to implement.
+     */
+    protected void innerClose() throws IOException {
+
+    }
+
+  }
+
+  // ====================================================================
+
+  /**
+   * Use byte arrays on the heap for storage.
+   */
+  static class ArrayBlockFactory extends BlockFactory {
+
+    ArrayBlockFactory(S3AFileSystem owner) {
+      super(owner);
+    }
+
+    @Override
+    DataBlock create(int limit) throws IOException {
+      return new ByteArrayBlock(limit);
+    }
+
+  }
+
+  /**
+   * Stream to memory via a {@code ByteArrayOutputStream}.
+   *
+   * This was taken from {@code S3AFastOutputStream} and has the
+   * same problem which surfaced there: it can consume a lot of heap space
+   * proportional to the mismatch between writes to the stream and
+   * the JVM-wide upload bandwidth to the S3 endpoint.
+   * The memory consumption can be limited by tuning the filesystem settings
+   * to restrict the number of queued/active uploads.
+   */
+
+  static class ByteArrayBlock extends DataBlock {
+    private ByteArrayOutputStream buffer;
+    private final int limit;
+    // cache data size so that it is consistent after the buffer is reset.
+    private Integer dataSize;
+
+    ByteArrayBlock(int limit) {
+      this.limit = limit;
+      buffer = new ByteArrayOutputStream();
+    }
+
+    /**
+     * Get the amount of data; if there is no buffer then the size is 0.
+     * @return the amount of data available to upload.
+     */
+    @Override
+    int dataSize() {
+      return dataSize != null ? dataSize : buffer.size();
+    }
+
+    @Override
+    InputStream startUpload() throws IOException {
+      super.startUpload();
+      dataSize = buffer.size();
+      ByteArrayInputStream bufferData = new ByteArrayInputStream(
+          buffer.toByteArray());
+      buffer = null;
+      return bufferData;
+    }
+
+    @Override
+    boolean hasCapacity(long bytes) {
+      return dataSize() + bytes <= limit;
+    }
+
+    @Override
+    int remainingCapacity() {
+      return limit - dataSize();
+    }
+
+    @Override
+    int write(byte[] b, int offset, int len) throws IOException {
+      super.write(b, offset, len);
+      int written = Math.min(remainingCapacity(), len);
+      buffer.write(b, offset, written);
+      return written;
+    }
+
+    @Override
+    protected void innerClose() {
+      buffer = null;
+    }
+
+    @Override
+    public String toString() {
+      return "ByteArrayBlock{" +
+          "state=" + getState() +
+          ", limit=" + limit +
+          ", dataSize=" + dataSize +
+          '}';
+    }
+  }
+
+  // ====================================================================
+
+  /**
+   * Stream via Direct ByteBuffers; these are allocated off heap
+   * via {@link DirectBufferPool}.
+   * This is actually the most complex of all the block factories,
+   * due to the need to explicitly recycle buffers; in comparison, the
+   * {@link DiskBlock} buffer delegates the work of deleting files to
+   * the {@link DiskBlock.FileDeletingInputStream}. Here the
+   * input stream {@link ByteBufferInputStream} has a similar task, along
+   * with the foundational work of streaming data from a byte array.
+   */
+
+  static class ByteBufferBlockFactory extends BlockFactory {
+
+    private final DirectBufferPool bufferPool = new DirectBufferPool();
+    private final AtomicInteger buffersOutstanding = new AtomicInteger(0);
+
+    ByteBufferBlockFactory(S3AFileSystem owner) {
+      super(owner);
+    }
+
+    @Override
+    ByteBufferBlock create(int limit) throws IOException {
+      return new ByteBufferBlock(limit);
+    }
+
+    private ByteBuffer requestBuffer(int limit) {
+      LOG.debug("Requesting buffer of size {}", limit);
+      buffersOutstanding.incrementAndGet();
+      return bufferPool.getBuffer(limit);
+    }
+
+    private void releaseBuffer(ByteBuffer buffer) {
+      LOG.debug("Releasing buffer");
+      bufferPool.returnBuffer(buffer);
+      buffersOutstanding.decrementAndGet();
+    }
+
+    /**
+     * Get count of outstanding buffers.
+     * @return the current buffer count
+     */
+    public int getOutstandingBufferCount() {
+      return buffersOutstanding.get();
+    }
+
+    @Override
+    public String toString() {
+      return "ByteBufferBlockFactory{"
+          + "buffersOutstanding=" + buffersOutstanding +
+          '}';
+    }
+
+    /**
+     * A DataBlock which requests a buffer from pool on creation; returns
+     * it when the output stream is closed.
+     */
+    class ByteBufferBlock extends DataBlock {
+      private ByteBuffer buffer;
+      private final int bufferSize;
+      // cache data size so that it is consistent after the buffer is reset.
+      private Integer dataSize;
+
+      /**
+       * Instantiate. This will request a ByteBuffer of the desired size.
+       * @param bufferSize buffer size
+       */
+      ByteBufferBlock(int bufferSize) {
+        this.bufferSize = bufferSize;
+        buffer = requestBuffer(bufferSize);
+      }
+
+      /**
+       * Get the amount of data; if there is no buffer then the size is 0.
+       * @return the amount of data available to upload.
+       */
+      @Override
+      int dataSize() {
+        return dataSize != null ? dataSize : bufferCapacityUsed();
+      }
+
+      @Override
+      ByteBufferInputStream startUpload() throws IOException {
+        super.startUpload();
+        dataSize = bufferCapacityUsed();
+        // set the buffer up from reading from the beginning
+        buffer.limit(buffer.position());
+        buffer.position(0);
+        return new ByteBufferInputStream(dataSize, buffer);
+      }
+
+      @Override
+      public boolean hasCapacity(long bytes) {
+        return bytes <= remainingCapacity();
+      }
+
+      @Override
+      public int remainingCapacity() {
+        return buffer != null ? buffer.remaining() : 0;
+      }
+
+      private int bufferCapacityUsed() {
+        return buffer.capacity() - buffer.remaining();
+      }
+
+      @Override
+      int write(byte[] b, int offset, int len) throws IOException {
+        super.write(b, offset, len);
+        int written = Math.min(remainingCapacity(), len);
+        buffer.put(b, offset, written);
+        return written;
+      }
+
+      @Override
+      protected void innerClose() {
+        buffer = null;
+      }
+
+      @Override
+      public String toString() {
+        return "ByteBufferBlock{"
+            + "state=" + getState() +
+            ", dataSize=" + dataSize() +
+            ", limit=" + bufferSize +
+            ", remainingCapacity=" + remainingCapacity() +
+            '}';
+      }
+
+    }
+
+    /**
+     * Provide an input stream from a byte buffer; supporting
+     * {@link #mark(int)}, which is required to enable replay of failed
+     * PUT attempts.
+     * This input stream returns the buffer to the pool afterwards.
+     */
+    class ByteBufferInputStream extends InputStream {
+
+      private final int size;
+      private ByteBuffer byteBuffer;
+
+      ByteBufferInputStream(int size, ByteBuffer byteBuffer) {
+        LOG.debug("Creating ByteBufferInputStream of size {}", size);
+        this.size = size;
+        this.byteBuffer = byteBuffer;
+      }
+
+      /**
+       * Return the buffer to the pool after the stream is closed.
+       */
+      @Override
+      public synchronized void close() {
+        if (byteBuffer != null) {
+          LOG.debug("releasing buffer");
+          releaseBuffer(byteBuffer);
+          byteBuffer = null;
+        }
+      }
+
+      /**
+       * Verify that the stream is open.
+       * @throws IOException if the stream is closed
+       */
+      private void verifyOpen() throws IOException {
+        if (byteBuffer == null) {
+          throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+        }
+      }
+
+      public synchronized int read() throws IOException {
+        if (available() > 0) {
+          return byteBuffer.get() & 0xFF;
+        } else {
+          return -1;
+        }
+      }
+
+      @Override
+      public synchronized long skip(long offset) throws IOException {
+        verifyOpen();
+        long newPos = position() + offset;
+        if (newPos < 0) {
+          throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
+        }
+        if (newPos > size) {
+          throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
+        }
+        byteBuffer.position((int) newPos);
+        return newPos;
+      }
+
+      @Override
+      public synchronized int available() {
+        Preconditions.checkState(byteBuffer != null,
+            FSExceptionMessages.STREAM_IS_CLOSED);
+        return byteBuffer.remaining();
+      }
+
+      /**
+       * Get the current buffer position.
+       * @return the buffer position
+       */
+      public synchronized int position() {
+        return byteBuffer.position();
+      }
+
+      /**
+       * Check if there is data left.
+       * @return true if there is data remaining in the buffer.
+       */
+      public synchronized boolean hasRemaining() {
+        return byteBuffer.hasRemaining();
+      }
+
+      @Override
+      public synchronized void mark(int readlimit) {
+        LOG.debug("mark at {}", position());
+        byteBuffer.mark();
+      }
+
+      @Override
+      public synchronized void reset() throws IOException {
+        LOG.debug("reset");
+        byteBuffer.reset();
+      }
+
+      @Override
+      public boolean markSupported() {
+        return true;
+      }
+
+      /**
+       * Read in data.
+       * @param buffer destination buffer
+       * @param offset offset within the buffer
+       * @param length length of bytes to read
+       * @throws EOFException if the position is negative
+       * @throws IndexOutOfBoundsException if there isn't space for the
+       * amount of data requested.
+       * @throws IllegalArgumentException other arguments are invalid.
+       */
+      @SuppressWarnings("NullableProblems")
+      public synchronized int read(byte[] buffer, int offset, int length)
+          throws IOException {
+        Preconditions.checkArgument(length >= 0, "length is negative");
+        Preconditions.checkArgument(buffer != null, "Null buffer");
+        if (buffer.length - offset < length) {
+          throw new IndexOutOfBoundsException(
+              FSExceptionMessages.TOO_MANY_BYTES_FOR_DEST_BUFFER
+                  + ": request length =" + length
+                  + ", with offset =" + offset
+                  + "; buffer capacity =" + (buffer.length - offset));
+        }
+        verifyOpen();
+        if (!hasRemaining()) {
+          return -1;
+        }
+
+        int toRead = Math.min(length, available());
+        byteBuffer.get(buffer, offset, toRead);
+        return toRead;
+      }
+
+      @Override
+      public String toString() {
+        final StringBuilder sb = new StringBuilder(
+            "ByteBufferInputStream{");
+        sb.append("size=").append(size);
+        ByteBuffer buffer = this.byteBuffer;
+        if (buffer != null) {
+          sb.append(", available=").append(buffer.remaining());
+        }
+        sb.append('}');
+        return sb.toString();
+      }
+    }
+  }
+
+  // ====================================================================
+
+  /**
+   * Buffer blocks to disk.
+   */
+  static class DiskBlockFactory extends BlockFactory {
+
+    DiskBlockFactory(S3AFileSystem owner) {
+      super(owner);
+    }
+
+    /**
+     * Create a temp file and a block which writes to it.
+     * @param limit limit of the block.
+     * @return the new block
+     * @throws IOException IO problems
+     */
+    @Override
+    DataBlock create(int limit) throws IOException {
+      File destFile = getOwner()
+          .createTmpFileForWrite("s3ablock", limit, getOwner().getConf());
+      return new DiskBlock(destFile, limit);
+    }
+  }
+
+  /**
+   * Stream to a file.
+   * This will stop at the limit; the caller is expected to create a new block
+   */
+  static class DiskBlock extends DataBlock {
+
+    private int bytesWritten;
+    private final File bufferFile;
+    private final int limit;
+    private BufferedOutputStream out;
+    private InputStream uploadStream;
+
+    DiskBlock(File bufferFile, int limit)
+        throws FileNotFoundException {
+      this.limit = limit;
+      this.bufferFile = bufferFile;
+      out = new BufferedOutputStream(new FileOutputStream(bufferFile));
+    }
+
+    @Override
+    int dataSize() {
+      return bytesWritten;
+    }
+
+    @Override
+    boolean hasCapacity(long bytes) {
+      return dataSize() + bytes <= limit;
+    }
+
+    @Override
+    int remainingCapacity() {
+      return limit - bytesWritten;
+    }
+
+    @Override
+    int write(byte[] b, int offset, int len) throws IOException {
+      super.write(b, offset, len);
+      int written = Math.min(remainingCapacity(), len);
+      out.write(b, offset, written);
+      bytesWritten += written;
+      return written;
+    }
+
+    @Override
+    InputStream startUpload() throws IOException {
+      super.startUpload();
+      try {
+        out.flush();
+      } finally {
+        out.close();
+        out = null;
+      }
+      uploadStream = new FileInputStream(bufferFile);
+      return new FileDeletingInputStream(uploadStream);
+    }
+
+    /**
+     * The close operation will delete the destination file if it still
+     * exists.
+     * @throws IOException IO problems
+     */
+    @Override
+    protected void innerClose() throws IOException {
+      final DestState state = getState();
+      LOG.debug("Closing {}", this);
+      switch (state) {
+      case Writing:
+        if (bufferFile.exists()) {
+          // file was not uploaded
+          LOG.debug("Deleting buffer file as upload did not start");
+          boolean deleted = bufferFile.delete();
+          if (!deleted && bufferFile.exists()) {
+            LOG.warn("Failed to delete buffer file {}", bufferFile);
+          }
+        }
+        break;
+
+      case Upload:
+        LOG.debug("Buffer file {} exists —close upload stream", bufferFile);
+        break;
+
+      case Closed:
+        // no-op
+        break;
+
+      default:
+        // this state can never be reached, but checkstyle complains, so
+        // it is here.
+      }
+    }
+
+    /**
+     * Flush operation will flush to disk.
+     * @throws IOException IOE raised on FileOutputStream
+     */
+    @Override
+    void flush() throws IOException {
+      super.flush();
+      out.flush();
+    }
+
+    @Override
+    public String toString() {
+      String sb = "FileBlock{"
+          + "destFile=" + bufferFile +
+          ", state=" + getState() +
+          ", dataSize=" + dataSize() +
+          ", limit=" + limit +
+          '}';
+      return sb;
+    }
+
+    /**
+     * An input stream which deletes the buffer file when closed.
+     */
+    private final class FileDeletingInputStream extends FilterInputStream {
+      private final AtomicBoolean closed = new AtomicBoolean(false);
+
+      FileDeletingInputStream(InputStream source) {
+        super(source);
+      }
+
+      /**
+       * Delete the input file when closed.
+       * @throws IOException IO problem
+       */
+      @Override
+      public void close() throws IOException {
+        try {
+          super.close();
+        } finally {
+          if (!closed.getAndSet(true)) {
+            if (!bufferFile.delete()) {
+              LOG.warn("delete({}) returned false",
+                  bufferFile.getAbsoluteFile());
+            }
+          }
+        }
+      }
+    }
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to