HADOOP-10400. Incorporate new S3A FileSystem implementation. Contributed by 
Jordan Mendelson and Dave Wang.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/24d920b8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/24d920b8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/24d920b8

Branch: refs/heads/trunk
Commit: 24d920b80eb3626073925a1d0b6dcf148add8cc0
Parents: fc741b5
Author: Aaron T. Myers <a...@apache.org>
Authored: Mon Sep 15 08:27:07 2014 -0700
Committer: Aaron T. Myers <a...@apache.org>
Committed: Mon Sep 15 08:27:07 2014 -0700

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt |    3 +
 .../src/main/conf/log4j.properties              |    5 +
 .../src/main/resources/core-default.xml         |   86 ++
 hadoop-hdfs-project/hadoop-hdfs-httpfs/pom.xml  |    8 +
 hadoop-project/pom.xml                          |   26 +-
 hadoop-tools/hadoop-aws/pom.xml                 |   10 +
 .../fs/s3a/AnonymousAWSCredentialsProvider.java |   37 +
 .../fs/s3a/BasicAWSCredentialsProvider.java     |   51 +
 .../org/apache/hadoop/fs/s3a/Constants.java     |   90 ++
 .../org/apache/hadoop/fs/s3a/S3AFileStatus.java |   62 ++
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 1019 ++++++++++++++++++
 .../apache/hadoop/fs/s3a/S3AInputStream.java    |  207 ++++
 .../apache/hadoop/fs/s3a/S3AOutputStream.java   |  208 ++++
 .../services/org.apache.hadoop.fs.FileSystem    |    1 +
 .../hadoop/fs/contract/s3a/S3AContract.java     |   43 +
 .../fs/contract/s3a/TestS3AContractCreate.java  |   38 +
 .../fs/contract/s3a/TestS3AContractDelete.java  |   31 +
 .../fs/contract/s3a/TestS3AContractMkdir.java   |   34 +
 .../fs/contract/s3a/TestS3AContractOpen.java    |   31 +
 .../fs/contract/s3a/TestS3AContractRename.java  |   64 ++
 .../fs/contract/s3a/TestS3AContractRootDir.java |   35 +
 .../fs/contract/s3a/TestS3AContractSeek.java    |   31 +
 .../fs/s3a/S3AFileSystemContractBaseTest.java   |  327 ++++++
 .../src/test/resources/contract/s3a.xml         |  105 ++
 .../src/test/resources/contract/s3n.xml         |    7 +-
 hadoop-tools/hadoop-azure/pom.xml               |   10 +-
 26 files changed, 2552 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/24d920b8/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt 
b/hadoop-common-project/hadoop-common/CHANGES.txt
index 051eac1..c2ae5ed 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -342,6 +342,9 @@ Release 2.6.0 - UNRELEASED
     HADOOP-10893. isolated classloader on the client side (Sangjin Lee via
     jlowe)
 
+    HADOOP-10400. Incorporate new S3A FileSystem implementation. (Jordan
+    Mendelson and Dave Wang via atm)
+
   IMPROVEMENTS
 
     HADOOP-10808. Remove unused native code for munlock. (cnauroth)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/24d920b8/hadoop-common-project/hadoop-common/src/main/conf/log4j.properties
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/conf/log4j.properties 
b/hadoop-common-project/hadoop-common/src/main/conf/log4j.properties
index ef9acbf..5fa21fa 100644
--- a/hadoop-common-project/hadoop-common/src/main/conf/log4j.properties
+++ b/hadoop-common-project/hadoop-common/src/main/conf/log4j.properties
@@ -174,6 +174,11 @@ 
log4j.appender.MRAUDIT.MaxBackupIndex=${mapred.audit.log.maxbackupindex}
 # Jets3t library
 log4j.logger.org.jets3t.service.impl.rest.httpclient.RestS3Service=ERROR
 
+# AWS SDK & S3A FileSystem
+log4j.logger.com.amazonaws=ERROR
+log4j.logger.com.amazonaws.http.AmazonHttpClient=ERROR
+log4j.logger.org.apache.hadoop.fs.s3a.S3AFileSystem=WARN
+
 #
 # Event Counter Appender
 # Sends counts of logging messages at different severity levels to Hadoop 
Metrics.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/24d920b8/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml 
b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index 3cc7545..828dec2 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -690,6 +690,92 @@ for ldap providers in the same way as above does.
 </property>
 
 <property>
+  <name>fs.s3a.access.key</name>
+  <description>AWS access key ID. Omit for Role-based 
authentication.</description>
+</property>
+
+<property>
+  <name>fs.s3a.secret.key</name>
+  <description>AWS secret key. Omit for Role-based 
authentication.</description>
+</property>
+
+<property>
+  <name>fs.s3a.connection.maximum</name>
+  <value>15</value>
+  <description>Controls the maximum number of simultaneous connections to 
S3.</description>
+</property>
+
+<property>
+  <name>fs.s3a.connection.ssl.enabled</name>
+  <value>true</value>
+  <description>Enables or disables SSL connections to S3.</description>
+</property>
+
+<property>
+  <name>fs.s3a.attempts.maximum</name>
+  <value>10</value>
+  <description>How many times we should retry commands on transient 
errors.</description>
+</property>
+
+<property>
+  <name>fs.s3a.connection.timeout</name>
+  <value>5000</value>
+  <description>Socket connection timeout in seconds.</description>
+</property>
+
+<property>
+  <name>fs.s3a.paging.maximum</name>
+  <value>5000</value>
+  <description>How many keys to request from S3 when doing 
+     directory listings at a time.</description>
+</property>
+
+<property>
+  <name>fs.s3a.multipart.size</name>
+  <value>104857600</value>
+  <description>How big (in bytes) to split upload or copy operations up 
into.</description>
+</property>
+
+<property>
+  <name>fs.s3a.multipart.threshold</name>
+  <value>2147483647</value>
+  <description>Threshold before uploads or copies use parallel multipart 
operations.</description>
+</property>
+
+<property>
+  <name>fs.s3a.acl.default</name>
+  <description>Set a canned ACL for newly created and copied objects. Value 
may be private, 
+     public-read, public-read-write, authenticated-read, log-delivery-write, 
+     bucket-owner-read, or bucket-owner-full-control.</description>
+</property>
+
+<property>
+  <name>fs.s3a.multipart.purge</name>
+  <value>false</value>
+  <description>True if you want to purge existing multipart uploads that may 
not have been
+     completed/aborted correctly</description>
+</property>
+
+<property>
+  <name>fs.s3a.multipart.purge.age</name>
+  <value>86400</value>
+  <description>Minimum age in seconds of multipart uploads to 
purge</description>
+</property>
+
+<property>
+  <name>fs.s3a.buffer.dir</name>
+  <value>${hadoop.tmp.dir}/s3a</value>
+  <description>Comma separated list of directories that will be used to buffer 
file 
+    uploads to.</description>
+</property>
+
+<property>
+  <name>fs.s3a.impl</name>
+  <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
+  <description>The implementation class of the S3A Filesystem</description>
+</property>
+
+<property>
   <name>io.seqfile.compress.blocksize</name>
   <value>1000000</value>
   <description>The minimum block size for compression in block compressed 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/24d920b8/hadoop-hdfs-project/hadoop-hdfs-httpfs/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/pom.xml 
b/hadoop-hdfs-project/hadoop-hdfs-httpfs/pom.xml
index 24fa87b..a44f686 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/pom.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/pom.xml
@@ -131,6 +131,10 @@
           <artifactId>jets3t</artifactId>
         </exclusion>
         <exclusion>
+          <groupId>com.amazonaws</groupId>
+          <artifactId>aws-java-sdk</artifactId>
+        </exclusion>
+        <exclusion>
           <groupId>org.eclipse.jdt</groupId>
           <artifactId>core</artifactId>
         </exclusion>
@@ -170,6 +174,10 @@
           <artifactId>jets3t</artifactId>
         </exclusion>
         <exclusion>
+          <groupId>com.amazonaws</groupId>
+          <artifactId>aws-java-sdk</artifactId>
+        </exclusion>
+        <exclusion>
           <groupId>org.eclipse.jdt</groupId>
           <artifactId>core</artifactId>
         </exclusion>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/24d920b8/hadoop-project/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index ad8422f..502655f 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -61,8 +61,9 @@
     <!-- jersey version -->
     <jersey.version>1.9</jersey.version>
 
-    <!-- jackson version -->
+    <!-- jackson versions -->
     <jackson.version>1.9.13</jackson.version>
+    <jackson2.version>2.2.3</jackson2.version>
 
     <!-- ProtocolBuffer version, used to verify the protoc version and -->
     <!-- define the protobuf JAR version                               -->
@@ -581,13 +582,7 @@
       <dependency>
         <groupId>com.amazonaws</groupId>
         <artifactId>aws-java-sdk</artifactId>
-        <version>1.7.2</version>
-        <exclusions>
-          <exclusion>
-            <groupId>com.fasterxml.jackson.core</groupId>
-            <artifactId>jackson-core</artifactId>
-          </exclusion>
-        </exclusions>
+        <version>1.7.4</version>
       </dependency>
       <dependency>
         <groupId>org.apache.mina</groupId>
@@ -675,6 +670,21 @@
         <version>${jackson.version}</version>
       </dependency>
       <dependency>
+        <groupId>com.fasterxml.jackson.core</groupId>
+        <artifactId>jackson-core</artifactId>
+        <version>${jackson2.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>com.fasterxml.jackson.core</groupId>
+        <artifactId>jackson-databind</artifactId>
+        <version>${jackson2.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>com.fasterxml.jackson.core</groupId>
+        <artifactId>jackson-annotations</artifactId>
+        <version>${jackson2.version}</version>
+      </dependency>
+      <dependency>
         <groupId>org.mockito</groupId>
         <artifactId>mockito-all</artifactId>
         <version>1.8.5</version>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/24d920b8/hadoop-tools/hadoop-aws/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/pom.xml b/hadoop-tools/hadoop-aws/pom.xml
index c01a33d..61a5e84 100644
--- a/hadoop-tools/hadoop-aws/pom.xml
+++ b/hadoop-tools/hadoop-aws/pom.xml
@@ -100,6 +100,16 @@
       <type>test-jar</type>
     </dependency>
 
+    <!-- see ../../hadoop-project/pom.xml for versions -->
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-annotations</artifactId>
+    </dependency>
+
     <dependency>
       <groupId>com.amazonaws</groupId>
       <artifactId>aws-java-sdk</artifactId>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/24d920b8/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AnonymousAWSCredentialsProvider.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AnonymousAWSCredentialsProvider.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AnonymousAWSCredentialsProvider.java
new file mode 100644
index 0000000..2a24273
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AnonymousAWSCredentialsProvider.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.AnonymousAWSCredentials;
+import com.amazonaws.auth.AWSCredentials;
+
+public class AnonymousAWSCredentialsProvider implements AWSCredentialsProvider 
{
+  public AWSCredentials getCredentials() {
+    return new AnonymousAWSCredentials();
+  }
+
+  public void refresh() {}
+
+  @Override
+  public String toString() {
+    return getClass().getSimpleName();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/24d920b8/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BasicAWSCredentialsProvider.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BasicAWSCredentialsProvider.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BasicAWSCredentialsProvider.java
new file mode 100644
index 0000000..8d45bc6
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BasicAWSCredentialsProvider.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.auth.AWSCredentials;
+
+public class BasicAWSCredentialsProvider implements AWSCredentialsProvider {
+  private String accessKey;
+  private String secretKey;
+
+  public BasicAWSCredentialsProvider(String accessKey, String secretKey) {
+    this.accessKey = accessKey;
+    this.secretKey = secretKey;
+  }
+
+  public AWSCredentials getCredentials() {
+    if (accessKey != null && secretKey != null) {
+      return new BasicAWSCredentials(accessKey, secretKey);
+    }
+
+    throw new AmazonClientException(
+        "Access key or secret key is null");
+  }
+
+  public void refresh() {}
+
+  @Override
+  public String toString() {
+    return getClass().getSimpleName();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/24d920b8/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
new file mode 100644
index 0000000..9723b82
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+
+public class Constants {
+  // s3 access key
+  public static final String OLD_ACCESS_KEY = "fs.s3a.awsAccessKeyId";
+  public static final String NEW_ACCESS_KEY = "fs.s3a.access.key";
+
+  // s3 secret key
+  public static final String OLD_SECRET_KEY = "fs.s3a.awsSecretAccessKey";
+  public static final String NEW_SECRET_KEY = "fs.s3a.secret.key";
+  
+  // number of simultaneous connections to s3
+  public static final String OLD_MAXIMUM_CONNECTIONS = "fs.s3a.maxConnections";
+  public static final String NEW_MAXIMUM_CONNECTIONS = 
"fs.s3a.connection.maximum";
+  public static final int DEFAULT_MAXIMUM_CONNECTIONS = 15;
+  
+  // connect to s3 over ssl?
+  public static final String OLD_SECURE_CONNECTIONS = 
"fs.s3a.secureConnections";
+  public static final String NEW_SECURE_CONNECTIONS = 
"fs.s3a.connection.ssl.enabled";
+  public static final boolean DEFAULT_SECURE_CONNECTIONS = true;
+  
+  // number of times we should retry errors
+  public static final String OLD_MAX_ERROR_RETRIES = "fs.s3a.maxErrorRetries";
+  public static final String NEW_MAX_ERROR_RETRIES = "fs.s3a.attempts.maximum";
+  public static final int DEFAULT_MAX_ERROR_RETRIES = 10;
+  
+  // seconds until we give up on a connection to s3
+  public static final String OLD_SOCKET_TIMEOUT = "fs.s3a.socketTimeout";
+  public static final String NEW_SOCKET_TIMEOUT = "fs.s3a.connection.timeout";
+  public static final int DEFAULT_SOCKET_TIMEOUT = 50000;
+
+  // number of records to get while paging through a directory listing
+  public static final String OLD_MAX_PAGING_KEYS = "fs.s3a.maxPagingKeys";
+  public static final String NEW_MAX_PAGING_KEYS = "fs.s3a.paging.maximum";
+  public static final int DEFAULT_MAX_PAGING_KEYS = 5000;
+
+  // size of each of or multipart pieces in bytes
+  public static final String OLD_MULTIPART_SIZE = "fs.s3a.multipartSize";
+  public static final String NEW_MULTIPART_SIZE = "fs.s3a.multipart.size";
+  public static final long DEFAULT_MULTIPART_SIZE = 104857600; // 100 MB
+  
+  // minimum size in bytes before we start a multipart uploads or copy
+  public static final String OLD_MIN_MULTIPART_THRESHOLD = 
"fs.s3a.minMultipartSize";
+  public static final String NEW_MIN_MULTIPART_THRESHOLD = 
"fs.s3a.multipart.threshold";
+  public static final int DEFAULT_MIN_MULTIPART_THRESHOLD = Integer.MAX_VALUE;
+  
+  // comma separated list of directories
+  public static final String BUFFER_DIR = "fs.s3a.buffer.dir";
+
+  // private | public-read | public-read-write | authenticated-read | 
+  // log-delivery-write | bucket-owner-read | bucket-owner-full-control
+  public static final String OLD_CANNED_ACL = "fs.s3a.cannedACL";
+  public static final String NEW_CANNED_ACL = "fs.s3a.acl.default";
+  public static final String DEFAULT_CANNED_ACL = "";
+
+  // should we try to purge old multipart uploads when starting up
+  public static final String OLD_PURGE_EXISTING_MULTIPART = 
"fs.s3a.purgeExistingMultiPart";
+  public static final String NEW_PURGE_EXISTING_MULTIPART = 
"fs.s3a.multipart.purge";
+  public static final boolean DEFAULT_PURGE_EXISTING_MULTIPART = false;
+
+  // purge any multipart uploads older than this number of seconds
+  public static final String OLD_PURGE_EXISTING_MULTIPART_AGE = 
"fs.s3a.purgeExistingMultiPartAge";
+  public static final String NEW_PURGE_EXISTING_MULTIPART_AGE = 
"fs.s3a.multipart.purge.age";
+  public static final long DEFAULT_PURGE_EXISTING_MULTIPART_AGE = 14400;
+
+  // s3 server-side encryption
+  public static final String SERVER_SIDE_ENCRYPTION_ALGORITHM = 
+    "fs.s3a.server-side-encryption-algorithm";
+  
+  public static final String S3N_FOLDER_SUFFIX = "_$folder$";
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/24d920b8/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java
new file mode 100644
index 0000000..eb64492
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
+public class S3AFileStatus extends FileStatus {
+  private boolean isEmptyDirectory;
+
+  // Directories
+  public S3AFileStatus(boolean isdir, boolean isemptydir, Path path) {
+    super(0, isdir, 1, 0, 0, path);
+    isEmptyDirectory = isemptydir;
+  }
+
+  // Files
+  public S3AFileStatus(long length, long modification_time, Path path) {
+    super(length, false, 1, 0, modification_time, path);
+    isEmptyDirectory = false;
+  }
+
+  public boolean isEmptyDirectory() {
+    return isEmptyDirectory;
+  }
+  
+  /** Compare if this object is equal to another object
+   * @param   o the object to be compared.
+   * @return  true if two file status has the same path name; false if not.
+   */
+  @Override
+  public boolean equals(Object o) {
+    return super.equals(o);
+  }
+  
+  /**
+   * Returns a hash code value for the object, which is defined as
+   * the hash code of the path name.
+   *
+   * @return  a hash code value for the path name.
+   */
+  @Override
+  public int hashCode() {
+    return super.hashCode();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/24d920b8/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
new file mode 100644
index 0000000..a597e62
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -0,0 +1,1019 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.Protocol;
+import com.amazonaws.auth.AWSCredentialsProviderChain;
+
+import com.amazonaws.auth.InstanceProfileCredentialsProvider;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.CannedAccessControlList;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.CopyObjectRequest;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.amazonaws.services.s3.transfer.Copy;
+import com.amazonaws.services.s3.transfer.TransferManager;
+import com.amazonaws.services.s3.transfer.TransferManagerConfiguration;
+import com.amazonaws.services.s3.transfer.Upload;
+import com.amazonaws.event.ProgressListener;
+import com.amazonaws.event.ProgressEvent;
+
+import org.apache.commons.lang.StringUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+
+import static org.apache.hadoop.fs.s3a.Constants.*;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class S3AFileSystem extends FileSystem {
+  private URI uri;
+  private Path workingDir;
+  private AmazonS3Client s3;
+  private String bucket;
+  private int maxKeys;
+  private long partSize;
+  private int partSizeThreshold;
+  public static final Logger LOG = 
LoggerFactory.getLogger(S3AFileSystem.class);
+  private CannedAccessControlList cannedACL;
+  private String serverSideEncryptionAlgorithm;
+
+
+  /** Called after a new FileSystem instance is constructed.
+   * @param name a uri whose authority section names the host, port, etc.
+   *   for this FileSystem
+   * @param conf the configuration
+   */
+  public void initialize(URI name, Configuration conf) throws IOException {
+    super.initialize(name, conf);
+
+
+    uri = URI.create(name.getScheme() + "://" + name.getAuthority());
+    workingDir = new Path("/user", 
System.getProperty("user.name")).makeQualified(this.uri,
+        this.getWorkingDirectory());
+
+    // Try to get our credentials or just connect anonymously
+    String accessKey = conf.get(NEW_ACCESS_KEY, conf.get(OLD_ACCESS_KEY, 
null));
+    String secretKey = conf.get(NEW_SECRET_KEY, conf.get(OLD_SECRET_KEY, 
null));
+
+    String userInfo = name.getUserInfo();
+    if (userInfo != null) {
+      int index = userInfo.indexOf(':');
+      if (index != -1) {
+        accessKey = userInfo.substring(0, index);
+        secretKey = userInfo.substring(index + 1);
+      } else {
+        accessKey = userInfo;
+      }
+    }
+
+    AWSCredentialsProviderChain credentials = new AWSCredentialsProviderChain(
+        new BasicAWSCredentialsProvider(accessKey, secretKey),
+        new InstanceProfileCredentialsProvider(),
+        new AnonymousAWSCredentialsProvider()
+    );
+
+    bucket = name.getHost();
+
+    ClientConfiguration awsConf = new ClientConfiguration();
+    awsConf.setMaxConnections(conf.getInt(NEW_MAXIMUM_CONNECTIONS, 
+      conf.getInt(OLD_MAXIMUM_CONNECTIONS, DEFAULT_MAXIMUM_CONNECTIONS)));
+    awsConf.setProtocol(conf.getBoolean(NEW_SECURE_CONNECTIONS, 
+      conf.getBoolean(OLD_SECURE_CONNECTIONS, DEFAULT_SECURE_CONNECTIONS)) ? 
+        Protocol.HTTPS : Protocol.HTTP);
+    awsConf.setMaxErrorRetry(conf.getInt(NEW_MAX_ERROR_RETRIES, 
+      conf.getInt(OLD_MAX_ERROR_RETRIES, DEFAULT_MAX_ERROR_RETRIES)));
+    awsConf.setSocketTimeout(conf.getInt(NEW_SOCKET_TIMEOUT, 
+      conf.getInt(OLD_SOCKET_TIMEOUT, DEFAULT_SOCKET_TIMEOUT)));
+
+    s3 = new AmazonS3Client(credentials, awsConf);
+
+    maxKeys = conf.getInt(NEW_MAX_PAGING_KEYS, 
+      conf.getInt(OLD_MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS));
+    partSize = conf.getLong(NEW_MULTIPART_SIZE, 
+      conf.getLong(OLD_MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE));
+    partSizeThreshold = conf.getInt(NEW_MIN_MULTIPART_THRESHOLD, 
+      conf.getInt(OLD_MIN_MULTIPART_THRESHOLD, 
DEFAULT_MIN_MULTIPART_THRESHOLD));
+
+    if (partSize < 5 * 1024 * 1024) {
+      LOG.error(NEW_MULTIPART_SIZE + " must be at least 5 MB");
+      partSize = 5 * 1024 * 1024;
+    }
+
+    if (partSizeThreshold < 5 * 1024 * 1024) {
+      LOG.error(NEW_MIN_MULTIPART_THRESHOLD + " must be at least 5 MB");
+      partSizeThreshold = 5 * 1024 * 1024;
+    }
+
+    String cannedACLName = conf.get(NEW_CANNED_ACL, 
+      conf.get(OLD_CANNED_ACL, DEFAULT_CANNED_ACL));
+    if (!cannedACLName.isEmpty()) {
+      cannedACL = CannedAccessControlList.valueOf(cannedACLName);
+    } else {
+      cannedACL = null;
+    }
+
+    if (!s3.doesBucketExist(bucket)) {
+      throw new IOException("Bucket " + bucket + " does not exist");
+    }
+
+    boolean purgeExistingMultipart = 
conf.getBoolean(NEW_PURGE_EXISTING_MULTIPART, 
+      conf.getBoolean(OLD_PURGE_EXISTING_MULTIPART, 
DEFAULT_PURGE_EXISTING_MULTIPART));
+    long purgeExistingMultipartAge = 
conf.getLong(NEW_PURGE_EXISTING_MULTIPART_AGE, 
+      conf.getLong(OLD_PURGE_EXISTING_MULTIPART_AGE, 
DEFAULT_PURGE_EXISTING_MULTIPART_AGE));
+
+    if (purgeExistingMultipart) {
+      TransferManager transferManager = new TransferManager(s3);
+      Date purgeBefore = new Date(new Date().getTime() - 
purgeExistingMultipartAge*1000);
+
+      transferManager.abortMultipartUploads(bucket, purgeBefore);
+      transferManager.shutdownNow(false);
+    }
+
+    serverSideEncryptionAlgorithm = conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM);
+
+    setConf(conf);
+  }
+
+  /**
+   * Return the protocol scheme for the FileSystem.
+   *
+   * @return "s3a"
+   */
+  public String getScheme() {
+    return "s3a";
+  }
+
+  /** Returns a URI whose scheme and authority identify this FileSystem.*/
+  public URI getUri() {
+    return uri;
+  }
+
+
+  public S3AFileSystem() {
+    super();
+  }
+
+  /* Turns a path (relative or otherwise) into an S3 key
+   */
+  private String pathToKey(Path path) {
+    if (!path.isAbsolute()) {
+      path = new Path(workingDir, path);
+    }
+
+    if (path.toUri().getScheme() != null && path.toUri().getPath().isEmpty()) {
+      return "";
+    }
+
+    return path.toUri().getPath().substring(1);
+  }
+
+  private Path keyToPath(String key) {
+    return new Path("/" + key);
+  }
+
+  /**
+   * Opens an FSDataInputStream at the indicated Path.
+   * @param f the file name to open
+   * @param bufferSize the size of the buffer to be used.
+   */
+  public FSDataInputStream open(Path f, int bufferSize)
+      throws IOException {
+
+    LOG.info("Opening '" + f + "' for reading");
+    final FileStatus fileStatus = getFileStatus(f);
+    if (fileStatus.isDirectory()) {
+      throw new FileNotFoundException("Can't open " + f + " because it is a 
directory");
+    }
+
+    return new FSDataInputStream(new S3AInputStream(bucket, pathToKey(f), 
+      fileStatus.getLen(), s3, statistics));
+  }
+
+  /**
+   * Create an FSDataOutputStream at the indicated Path with write-progress
+   * reporting.
+   * @param f the file name to open
+   * @param permission
+   * @param overwrite if a file with this name already exists, then if true,
+   *   the file will be overwritten, and if false an error will be thrown.
+   * @param bufferSize the size of the buffer to be used.
+   * @param replication required block replication for the file.
+   * @param blockSize
+   * @param progress
+   * @throws IOException
+   * @see #setPermission(Path, FsPermission)
+   */
+  public FSDataOutputStream create(Path f, FsPermission permission, boolean 
overwrite, 
+    int bufferSize, short replication, long blockSize, Progressable progress) 
throws IOException {
+    String key = pathToKey(f);
+
+    if (!overwrite && exists(f)) {
+      throw new FileAlreadyExistsException(f + " already exists");
+    }
+
+    // We pass null to FSDataOutputStream so it won't count writes that are 
being buffered to a file
+    return new FSDataOutputStream(new S3AOutputStream(getConf(), s3, this, 
+      bucket, key, progress, cannedACL, statistics, 
+      serverSideEncryptionAlgorithm), null);
+  }
+
+  /**
+   * Append to an existing file (optional operation).
+   * @param f the existing file to be appended.
+   * @param bufferSize the size of the buffer to be used.
+   * @param progress for reporting progress if it is not null.
+   * @throws IOException
+   */
+  public FSDataOutputStream append(Path f, int bufferSize, 
+    Progressable progress) throws IOException {
+    throw new IOException("Not supported");
+  }
+
+
+  /**
+   * Renames Path src to Path dst.  Can take place on local fs
+   * or remote DFS.
+   *
+   * Warning: S3 does not support renames. This method does a copy which can 
+   * take S3 some time to execute with large files and directories. Since 
+   * there is no Progressable passed in, this can time out jobs.
+   *
+   * Note: This implementation differs with other S3 drivers. Specifically:
+   *       Fails if src is a file and dst is a directory.
+   *       Fails if src is a directory and dst is a file.
+   *       Fails if the parent of dst does not exist or is a file.
+   *       Fails if dst is a directory that is not empty.
+   *
+   * @param src path to be renamed
+   * @param dst new path after rename
+   * @throws IOException on failure
+   * @return true if rename is successful
+   */
+  public boolean rename(Path src, Path dst) throws IOException {
+    LOG.info("Rename path " + src + " to " + dst);
+
+    String srcKey = pathToKey(src);
+    String dstKey = pathToKey(dst);
+
+    if (srcKey.length() == 0 || dstKey.length() == 0) {
+      LOG.info("rename: src or dst are empty");
+      return false;
+    }
+
+    if (srcKey.equals(dstKey)) {
+      LOG.info("rename: src and dst refer to the same file");
+      return true;
+    }
+
+    S3AFileStatus srcStatus;
+    try {
+      srcStatus = getFileStatus(src);
+    } catch (FileNotFoundException e) {
+      LOG.info("rename: src not found " + src);
+      return false;
+    }
+
+    S3AFileStatus dstStatus = null;
+    try {
+      dstStatus = getFileStatus(dst);
+
+      if (srcStatus.isFile() && dstStatus.isDirectory()) {
+        LOG.info("rename: src is a file and dst is a directory");
+        return false;
+      }
+
+      if (srcStatus.isDirectory() && dstStatus.isFile()) {
+        LOG.info("rename: src is a directory and dst is a file");
+        return false;
+      }
+
+    } catch (FileNotFoundException e) {
+      // Parent must exist
+      Path parent = dst.getParent();
+      if (!pathToKey(parent).isEmpty()) {
+        try {
+          S3AFileStatus dstParentStatus = getFileStatus(dst.getParent());
+          if (!dstParentStatus.isDirectory()) {
+            return false;
+          }
+        } catch (FileNotFoundException e2) {
+          return false;
+        }
+      }
+    }
+
+    // Ok! Time to start
+    if (srcStatus.isFile()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("rename: renaming file " + src + " to " + dst);
+      }
+      copyFile(srcKey, dstKey);
+      delete(src, false);
+    } else {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("rename: renaming directory " + src + " to " + dst);
+      }
+
+      // This is a directory to directory copy
+      if (!dstKey.endsWith("/")) {
+        dstKey = dstKey + "/";
+      }
+
+      if (!srcKey.endsWith("/")) {
+        srcKey = srcKey + "/";
+      }
+
+      List<DeleteObjectsRequest.KeyVersion> keysToDelete = 
+        new ArrayList<DeleteObjectsRequest.KeyVersion>();
+      if (dstStatus != null && dstStatus.isEmptyDirectory()) {
+        copyFile(srcKey, dstKey);
+        statistics.incrementWriteOps(1);
+        keysToDelete.add(new DeleteObjectsRequest.KeyVersion(srcKey));
+      }
+
+      ListObjectsRequest request = new ListObjectsRequest();
+      request.setBucketName(bucket);
+      request.setPrefix(srcKey);
+      request.setMaxKeys(maxKeys);
+
+      ObjectListing objects = s3.listObjects(request);
+      statistics.incrementReadOps(1);
+
+      while (true) {
+        for (S3ObjectSummary summary : objects.getObjectSummaries()) {
+          keysToDelete.add(new 
DeleteObjectsRequest.KeyVersion(summary.getKey()));
+          String newDstKey = dstKey + 
summary.getKey().substring(srcKey.length());
+          copyFile(summary.getKey(), newDstKey);
+        }
+
+        if (objects.isTruncated()) {
+          objects = s3.listNextBatchOfObjects(objects);
+          statistics.incrementReadOps(1);
+        } else {
+          break;
+        }
+      }
+
+
+      if (!keysToDelete.isEmpty()) {
+        DeleteObjectsRequest deleteRequest = new DeleteObjectsRequest(bucket);
+        deleteRequest.setKeys(keysToDelete);
+        s3.deleteObjects(deleteRequest);
+        statistics.incrementWriteOps(1);
+      }
+    }
+
+    if (src.getParent() != dst.getParent()) {
+      deleteUnnecessaryFakeDirectories(dst.getParent());
+      createFakeDirectoryIfNecessary(src.getParent());
+    }
+    return true;
+  }
+
+  /** Delete a file.
+   *
+   * @param f the path to delete.
+   * @param recursive if path is a directory and set to
+   * true, the directory is deleted else throws an exception. In
+   * case of a file the recursive can be set to either true or false.
+   * @return  true if delete is successful else false.
+   * @throws IOException
+   */
+  public boolean delete(Path f, boolean recursive) throws IOException {
+    LOG.info("Delete path " + f + " - recursive " + recursive);
+    S3AFileStatus status;
+    try {
+      status = getFileStatus(f);
+    } catch (FileNotFoundException e) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Couldn't delete " + f + " - does not exist");
+      }
+      return false;
+    }
+
+    String key = pathToKey(f);
+
+    if (status.isDirectory()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("delete: Path is a directory");
+      }
+
+      if (!recursive && !status.isEmptyDirectory()) {
+        throw new IOException("Path is a folder: " + f + 
+                              " and it is not an empty directory");
+      }
+
+      if (!key.endsWith("/")) {
+        key = key + "/";
+      }
+
+      if (key.equals("/")) {
+        LOG.info("s3a cannot delete the root directory");
+        return false;
+      }
+
+      if (status.isEmptyDirectory()) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Deleting fake empty directory");
+        }
+        s3.deleteObject(bucket, key);
+        statistics.incrementWriteOps(1);
+      } else {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Getting objects for directory prefix " + key + " to 
delete");
+        }
+
+        ListObjectsRequest request = new ListObjectsRequest();
+        request.setBucketName(bucket);
+        request.setPrefix(key);
+        // Hopefully not setting a delimiter will cause this to find everything
+        //request.setDelimiter("/");
+        request.setMaxKeys(maxKeys);
+
+        List<DeleteObjectsRequest.KeyVersion> keys = 
+          new ArrayList<DeleteObjectsRequest.KeyVersion>();
+        ObjectListing objects = s3.listObjects(request);
+        statistics.incrementReadOps(1);
+        while (true) {
+          for (S3ObjectSummary summary : objects.getObjectSummaries()) {
+            keys.add(new DeleteObjectsRequest.KeyVersion(summary.getKey()));
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Got object to delete " + summary.getKey());
+            }
+          }
+
+          DeleteObjectsRequest deleteRequest = new 
DeleteObjectsRequest(bucket);
+          deleteRequest.setKeys(keys);
+          s3.deleteObjects(deleteRequest);
+          statistics.incrementWriteOps(1);
+          keys.clear();
+
+          if (objects.isTruncated()) {
+            objects = s3.listNextBatchOfObjects(objects);
+            statistics.incrementReadOps(1);
+          } else {
+            break;
+          }
+        }
+      }
+    } else {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("delete: Path is a file");
+      }
+      s3.deleteObject(bucket, key);
+      statistics.incrementWriteOps(1);
+    }
+
+    createFakeDirectoryIfNecessary(f.getParent());
+
+    return true;
+  }
+
+  private void createFakeDirectoryIfNecessary(Path f) throws IOException {
+    String key = pathToKey(f);
+    if (!key.isEmpty() && !exists(f)) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Creating new fake directory at " + f);
+      }
+      createFakeDirectory(bucket, key);
+    }
+  }
+
+  /**
+   * List the statuses of the files/directories in the given path if the path 
is
+   * a directory.
+   *
+   * @param f given path
+   * @return the statuses of the files/directories in the given patch
+   * @throws FileNotFoundException when the path does not exist;
+   *         IOException see specific implementation
+   */
+  public FileStatus[] listStatus(Path f) throws FileNotFoundException,
+      IOException {
+    String key = pathToKey(f);
+    LOG.info("List status for path: " + f);
+
+    final List<FileStatus> result = new ArrayList<FileStatus>();
+    final FileStatus fileStatus =  getFileStatus(f);
+
+    if (fileStatus.isDirectory()) {
+      if (!key.isEmpty()) {
+        key = key + "/";
+      }
+
+      ListObjectsRequest request = new ListObjectsRequest();
+      request.setBucketName(bucket);
+      request.setPrefix(key);
+      request.setDelimiter("/");
+      request.setMaxKeys(maxKeys);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("listStatus: doing listObjects for directory " + key);
+      }
+
+      ObjectListing objects = s3.listObjects(request);
+      statistics.incrementReadOps(1);
+
+      while (true) {
+        for (S3ObjectSummary summary : objects.getObjectSummaries()) {
+          Path keyPath = keyToPath(summary.getKey()).makeQualified(uri, 
workingDir);
+          // Skip over keys that are ourselves and old S3N _$folder$ files
+          if (keyPath.equals(f) || 
summary.getKey().endsWith(S3N_FOLDER_SUFFIX)) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Ignoring: " + keyPath);
+            }
+            continue;
+          }
+
+          if (objectRepresentsDirectory(summary.getKey(), summary.getSize())) {
+            result.add(new S3AFileStatus(true, true, keyPath));
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Adding: fd: " + keyPath);
+            }
+          } else {
+            result.add(new S3AFileStatus(summary.getSize(), 
+              dateToLong(summary.getLastModified()), keyPath));
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Adding: fi: " + keyPath);
+            }
+          }
+        }
+
+        for (String prefix : objects.getCommonPrefixes()) {
+          Path keyPath = keyToPath(prefix).makeQualified(uri, workingDir);
+          if (keyPath.equals(f)) {
+            continue;
+          }
+          result.add(new S3AFileStatus(true, false, keyPath));
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Adding: rd: " + keyPath);
+          }
+        }
+
+        if (objects.isTruncated()) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("listStatus: list truncated - getting next batch");
+          }
+
+          objects = s3.listNextBatchOfObjects(objects);
+          statistics.incrementReadOps(1);
+        } else {
+          break;
+        }
+      }
+    } else {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Adding: rd (not a dir): " + f);
+      }
+      result.add(fileStatus);
+    }
+
+    return result.toArray(new FileStatus[result.size()]);
+  }
+
+
+
+  /**
+   * Set the current working directory for the given file system. All relative
+   * paths will be resolved relative to it.
+   *
+   * @param new_dir
+   */
+  public void setWorkingDirectory(Path new_dir) {
+    workingDir = new_dir;
+  }
+
+  /**
+   * Get the current working directory for the given file system
+   * @return the directory pathname
+   */
+  public Path getWorkingDirectory() {
+    return workingDir;
+  }
+
+  /**
+   * Make the given file and all non-existent parents into
+   * directories. Has the semantics of Unix 'mkdir -p'.
+   * Existence of the directory hierarchy is not an error.
+   * @param f path to create
+   * @param permission to apply to f
+   */
+  // TODO: If we have created an empty file at /foo/bar and we then call 
+  // mkdirs for /foo/bar/baz/roo what happens to the empty file /foo/bar/?
+  public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+    LOG.info("Making directory: " + f);
+
+    try {
+      FileStatus fileStatus = getFileStatus(f);
+
+      if (fileStatus.isDirectory()) {
+        return true;
+      } else {
+        throw new FileAlreadyExistsException("Path is a file: " + f);
+      }
+    } catch (FileNotFoundException e) {
+      Path fPart = f;
+      do {
+        try {
+          FileStatus fileStatus = getFileStatus(fPart);
+          if (fileStatus.isFile()) {
+            throw new FileAlreadyExistsException(String.format(
+                "Can't make directory for path '%s' since it is a file.", 
+                fPart));
+          }
+        } catch (FileNotFoundException fnfe) {
+        }
+        fPart = fPart.getParent();
+      } while (fPart != null);
+
+      String key = pathToKey(f);
+      createFakeDirectory(bucket, key);
+      return true;
+    }
+  }
+
+  /**
+   * Return a file status object that represents the path.
+   * @param f The path we want information from
+   * @return a FileStatus object
+   * @throws java.io.FileNotFoundException when the path does not exist;
+   *         IOException see specific implementation
+   */
+  public S3AFileStatus getFileStatus(Path f) throws IOException {
+    String key = pathToKey(f);
+
+    LOG.info("Getting path status for " + f + " (" + key + ")");
+
+    if (!key.isEmpty()) {
+      try {
+        ObjectMetadata meta = s3.getObjectMetadata(bucket, key);
+        statistics.incrementReadOps(1);
+
+        if (objectRepresentsDirectory(key, meta.getContentLength())) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Found exact file: fake directory");
+          }
+          return new S3AFileStatus(true, true, f.makeQualified(uri, 
workingDir));
+        } else {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Found exact file: normal file");
+          }
+          return new S3AFileStatus(meta.getContentLength(), 
dateToLong(meta.getLastModified()),
+              f.makeQualified(uri, workingDir));
+        }
+      } catch (AmazonServiceException e) {
+        if (e.getStatusCode() != 404) {
+          printAmazonServiceException(e);
+          throw e;
+        }
+      } catch (AmazonClientException e) {
+        printAmazonClientException(e);
+        throw e;
+      }
+
+      // Necessary?
+      if (!key.endsWith("/")) {
+        try {
+          String newKey = key + "/";
+          ObjectMetadata meta = s3.getObjectMetadata(bucket, newKey);
+          statistics.incrementReadOps(1);
+
+          if (objectRepresentsDirectory(newKey, meta.getContentLength())) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Found file (with /): fake directory");
+            }
+            return new S3AFileStatus(true, true, f.makeQualified(uri, 
workingDir));
+          } else {
+            LOG.warn("Found file (with /): real file? should not happen: " + 
key);
+
+            return new S3AFileStatus(meta.getContentLength(), 
dateToLong(meta.getLastModified()),
+                f.makeQualified(uri, workingDir));
+          }
+        } catch (AmazonServiceException e) {
+          if (e.getStatusCode() != 404) {
+            printAmazonServiceException(e);
+            throw e;
+          }
+        } catch (AmazonClientException e) {
+          printAmazonClientException(e);
+          throw e;
+        }
+      }
+    }
+
+    try {
+      if (!key.isEmpty() && !key.endsWith("/")) {
+        key = key + "/";
+      }
+      ListObjectsRequest request = new ListObjectsRequest();
+      request.setBucketName(bucket);
+      request.setPrefix(key);
+      request.setDelimiter("/");
+      request.setMaxKeys(1);
+
+      ObjectListing objects = s3.listObjects(request);
+      statistics.incrementReadOps(1);
+
+      if (objects.getCommonPrefixes().size() > 0 || 
objects.getObjectSummaries().size() > 0) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Found path as directory (with /): " + 
+            objects.getCommonPrefixes().size() + "/" + 
+            objects.getObjectSummaries().size());
+
+          for (S3ObjectSummary summary : objects.getObjectSummaries()) {
+            LOG.debug("Summary: " + summary.getKey() + " " + 
summary.getSize());
+          }
+          for (String prefix : objects.getCommonPrefixes()) {
+            LOG.debug("Prefix: " + prefix);
+          }
+        }
+
+        return new S3AFileStatus(true, false, f.makeQualified(uri, 
workingDir));
+      }
+    } catch (AmazonServiceException e) {
+      if (e.getStatusCode() != 404) {
+        printAmazonServiceException(e);
+        throw e;
+      }
+    } catch (AmazonClientException e) {
+      printAmazonClientException(e);
+      throw e;
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Not Found: " + f);
+    }
+    throw new FileNotFoundException("No such file or directory: " + f);
+  }
+
+  /**
+   * The src file is on the local disk.  Add it to FS at
+   * the given dst name.
+   *
+   * This version doesn't need to create a temporary file to calculate the md5.
+   * Sadly this doesn't seem to be used by the shell cp :(
+   *
+   * delSrc indicates if the source should be removed
+   * @param delSrc whether to delete the src
+   * @param overwrite whether to overwrite an existing file
+   * @param src path
+   * @param dst path
+   */
+  @Override
+  public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, 
+    Path dst) throws IOException {
+    String key = pathToKey(dst);
+
+    if (!overwrite && exists(dst)) {
+      throw new IOException(dst + " already exists");
+    }
+
+    LOG.info("Copying local file from " + src + " to " + dst);
+
+    // Since we have a local file, we don't need to stream into a temporary 
file
+    LocalFileSystem local = getLocal(getConf());
+    File srcfile = local.pathToFile(src);
+
+    TransferManagerConfiguration transferConfiguration = new 
TransferManagerConfiguration();
+    transferConfiguration.setMinimumUploadPartSize(partSize);
+    transferConfiguration.setMultipartUploadThreshold(partSizeThreshold);
+
+    TransferManager transfers = new TransferManager(s3);
+    transfers.setConfiguration(transferConfiguration);
+
+    final ObjectMetadata om = new ObjectMetadata();
+    if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
+      om.setServerSideEncryption(serverSideEncryptionAlgorithm);
+    }
+    PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, 
srcfile);
+    putObjectRequest.setCannedAcl(cannedACL);
+    putObjectRequest.setMetadata(om);
+
+    ProgressListener progressListener = new ProgressListener() {
+      public void progressChanged(ProgressEvent progressEvent) {
+        switch (progressEvent.getEventCode()) {
+          case ProgressEvent.PART_COMPLETED_EVENT_CODE:
+            statistics.incrementWriteOps(1);
+            break;
+        }
+      }
+    };
+
+    Upload up = transfers.upload(putObjectRequest);
+    up.addProgressListener(progressListener);
+    try {
+      up.waitForUploadResult();
+      statistics.incrementWriteOps(1);
+    } catch (InterruptedException e) {
+      throw new IOException("Got interrupted, cancelling");
+    } finally {
+      transfers.shutdownNow(false);
+    }
+
+    // This will delete unnecessary fake parent directories
+    finishedWrite(key);
+
+    if (delSrc) {
+      local.delete(src, false);
+    }
+  }
+
+  /**
+  * Override getCononicalServiceName because we don't support token in S3A
+  */
+  @Override
+  public String getCanonicalServiceName() {
+    // Does not support Token
+    return null;
+  }
+
+  private void copyFile(String srcKey, String dstKey) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("copyFile " + srcKey + " -> " + dstKey);
+    }
+
+    TransferManagerConfiguration transferConfiguration = new 
TransferManagerConfiguration();
+    transferConfiguration.setMultipartCopyPartSize(partSize);
+
+    TransferManager transfers = new TransferManager(s3);
+    transfers.setConfiguration(transferConfiguration);
+
+    ObjectMetadata srcom = s3.getObjectMetadata(bucket, srcKey);
+    final ObjectMetadata dstom = srcom.clone();
+    if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
+      dstom.setServerSideEncryption(serverSideEncryptionAlgorithm);
+    }
+    CopyObjectRequest copyObjectRequest = new CopyObjectRequest(bucket, 
srcKey, bucket, dstKey);
+    copyObjectRequest.setCannedAccessControlList(cannedACL);
+    copyObjectRequest.setNewObjectMetadata(dstom);
+
+    ProgressListener progressListener = new ProgressListener() {
+      public void progressChanged(ProgressEvent progressEvent) {
+        switch (progressEvent.getEventCode()) {
+          case ProgressEvent.PART_COMPLETED_EVENT_CODE:
+            statistics.incrementWriteOps(1);
+            break;
+        }
+      }
+    };
+
+    Copy copy = transfers.copy(copyObjectRequest);
+    copy.addProgressListener(progressListener);
+    try {
+      copy.waitForCopyResult();
+      statistics.incrementWriteOps(1);
+    } catch (InterruptedException e) {
+      throw new IOException("Got interrupted, cancelling");
+    } finally {
+      transfers.shutdownNow(false);
+    }
+  }
+
+  private boolean objectRepresentsDirectory(final String name, final long 
size) {
+    return !name.isEmpty() && name.charAt(name.length() - 1) == '/' && size == 
0L;
+  }
+
+  // Handles null Dates that can be returned by AWS
+  private static long dateToLong(final Date date) {
+    if (date == null) {
+      return 0L;
+    }
+
+    return date.getTime();
+  }
+
+  public void finishedWrite(String key) throws IOException {
+    deleteUnnecessaryFakeDirectories(keyToPath(key).getParent());
+  }
+
+  private void deleteUnnecessaryFakeDirectories(Path f) throws IOException {
+    while (true) {
+      try {
+        String key = pathToKey(f);
+        if (key.isEmpty()) {
+          break;
+        }
+
+        S3AFileStatus status = getFileStatus(f);
+
+        if (status.isDirectory() && status.isEmptyDirectory()) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Deleting fake directory " + key + "/");
+          }
+          s3.deleteObject(bucket, key + "/");
+          statistics.incrementWriteOps(1);
+        }
+      } catch (FileNotFoundException e) {
+      } catch (AmazonServiceException e) {}
+
+      if (f.isRoot()) {
+        break;
+      }
+
+      f = f.getParent();
+    }
+  }
+
+
+  private void createFakeDirectory(final String bucketName, final String 
objectName)
+      throws AmazonClientException, AmazonServiceException {
+    if (!objectName.endsWith("/")) {
+      createEmptyObject(bucketName, objectName + "/");
+    } else {
+      createEmptyObject(bucketName, objectName);
+    }
+  }
+
+  // Used to create an empty file that represents an empty directory
+  private void createEmptyObject(final String bucketName, final String 
objectName)
+      throws AmazonClientException, AmazonServiceException {
+    final InputStream im = new InputStream() {
+      @Override
+      public int read() throws IOException {
+        return -1;
+      }
+    };
+
+    final ObjectMetadata om = new ObjectMetadata();
+    om.setContentLength(0L);
+    if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
+      om.setServerSideEncryption(serverSideEncryptionAlgorithm);
+    }
+    PutObjectRequest putObjectRequest = new PutObjectRequest(bucketName, 
objectName, im, om);
+    putObjectRequest.setCannedAcl(cannedACL);
+    s3.putObject(putObjectRequest);
+    statistics.incrementWriteOps(1);
+  }
+
+  /**
+   * Return the number of bytes that large input files should be optimally
+   * be split into to minimize i/o time.
+   * @deprecated use {@link #getDefaultBlockSize(Path)} instead
+   */
+  @Deprecated
+  public long getDefaultBlockSize() {
+    // default to 32MB: large enough to minimize the impact of seeks
+    return getConf().getLong("fs.s3a.block.size", 32 * 1024 * 1024);
+  }
+
+  private void printAmazonServiceException(AmazonServiceException ase) {
+    LOG.info("Caught an AmazonServiceException, which means your request made 
it " +
+        "to Amazon S3, but was rejected with an error response for some 
reason.");
+    LOG.info("Error Message: " + ase.getMessage());
+    LOG.info("HTTP Status Code: " + ase.getStatusCode());
+    LOG.info("AWS Error Code: " + ase.getErrorCode());
+    LOG.info("Error Type: " + ase.getErrorType());
+    LOG.info("Request ID: " + ase.getRequestId());
+    LOG.info("Class Name: " + ase.getClass().getName());
+  }
+
+  private void printAmazonClientException(AmazonClientException ace) {
+    LOG.info("Caught an AmazonClientException, which means the client 
encountered " +
+        "a serious internal problem while trying to communicate with S3, " +
+        "such as not being able to access the network.");
+    LOG.info("Error Message: " + ace.getMessage());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/24d920b8/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
new file mode 100644
index 0000000..f65a5b0
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.model.S3ObjectInputStream;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FileSystem;
+
+import org.slf4j.Logger;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.net.SocketTimeoutException;
+import java.net.SocketException;
+
+public class S3AInputStream extends FSInputStream {
+  private long pos;
+  private boolean closed;
+  private S3ObjectInputStream wrappedStream;
+  private S3Object wrappedObject;
+  private FileSystem.Statistics stats;
+  private AmazonS3Client client;
+  private String bucket;
+  private String key;
+  private long contentLength;
+  public static final Logger LOG = S3AFileSystem.LOG;
+
+
+  public S3AInputStream(String bucket, String key, long contentLength, 
AmazonS3Client client,
+                        FileSystem.Statistics stats) {
+    this.bucket = bucket;
+    this.key = key;
+    this.contentLength = contentLength;
+    this.client = client;
+    this.stats = stats;
+    this.pos = 0;
+    this.closed = false;
+    this.wrappedObject = null;
+    this.wrappedStream = null;
+  }
+
+  private void openIfNeeded() throws IOException {
+    if (wrappedObject == null) {
+      reopen(0);
+    }
+  }
+
+  private synchronized void reopen(long pos) throws IOException {
+    if (wrappedStream != null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Aborting old stream to open at pos " + pos);
+      }
+      wrappedStream.abort();
+    }
+
+    if (pos < 0) {
+      throw new EOFException("Trying to seek to a negative offset " + pos);
+    }
+
+    if (contentLength > 0 && pos > contentLength-1) {
+      throw new EOFException("Trying to seek to an offset " + pos + 
+                             " past the end of the file");
+    }
+
+    LOG.info("Actually opening file " + key + " at pos " + pos);
+
+    GetObjectRequest request = new GetObjectRequest(bucket, key);
+    request.setRange(pos, contentLength-1);
+
+    wrappedObject = client.getObject(request);
+    wrappedStream = wrappedObject.getObjectContent();
+
+    if (wrappedStream == null) {
+      throw new IOException("Null IO stream");
+    }
+
+    this.pos = pos;
+  }
+
+  @Override
+  public synchronized long getPos() throws IOException {
+    return pos;
+  }
+
+  @Override
+  public synchronized void seek(long pos) throws IOException {
+    if (this.pos == pos) {
+      return;
+    }
+
+    LOG.info("Reopening " + this.key + " to seek to new offset " + (pos - 
this.pos));
+    reopen(pos);
+  }
+
+  @Override
+  public boolean seekToNewSource(long targetPos) throws IOException {
+    return false;
+  }
+
+  @Override
+  public synchronized int read() throws IOException {
+    if (closed) {
+      throw new IOException("Stream closed");
+    }
+
+    openIfNeeded();
+
+    int byteRead;
+    try {
+      byteRead = wrappedStream.read();
+    } catch (SocketTimeoutException e) {
+      LOG.info("Got timeout while trying to read from stream, trying to 
recover " + e);
+      reopen(pos);
+      byteRead = wrappedStream.read();
+    } catch (SocketException e) {
+      LOG.info("Got socket exception while trying to read from stream, trying 
to recover " + e);
+      reopen(pos);
+      byteRead = wrappedStream.read();
+    }
+
+    if (byteRead >= 0) {
+      pos++;
+    }
+
+    if (stats != null && byteRead >= 0) {
+      stats.incrementBytesRead(1);
+    }
+    return byteRead;
+  }
+
+  @Override
+  public synchronized int read(byte buf[], int off, int len) throws 
IOException {
+    if (closed) {
+      throw new IOException("Stream closed");
+    }
+
+    openIfNeeded();
+
+    int byteRead;
+    try {
+      byteRead = wrappedStream.read(buf, off, len);
+    } catch (SocketTimeoutException e) {
+      LOG.info("Got timeout while trying to read from stream, trying to 
recover " + e);
+      reopen(pos);
+      byteRead = wrappedStream.read(buf, off, len);
+    } catch (SocketException e) {
+      LOG.info("Got socket exception while trying to read from stream, trying 
to recover " + e);
+      reopen(pos);
+      byteRead = wrappedStream.read(buf, off, len);
+    }
+
+    if (byteRead > 0) {
+      pos += byteRead;
+    }
+
+    if (stats != null && byteRead > 0) {
+      stats.incrementBytesRead(byteRead);
+    }
+
+    return byteRead;
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    super.close();
+    closed = true;
+    if (wrappedObject != null) {
+      wrappedObject.close();
+    }
+  }
+
+  @Override
+  public synchronized int available() throws IOException {
+    if (closed) {
+      throw new IOException("Stream closed");
+    }
+    long remaining = this.contentLength - this.pos;
+    if (remaining > Integer.MAX_VALUE) {
+      return Integer.MAX_VALUE;
+    }
+    return (int)remaining;
+  }
+
+  @Override
+  public boolean markSupported() {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/24d920b8/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java
new file mode 100644
index 0000000..bdb723e
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.amazonaws.event.ProgressEvent;
+import com.amazonaws.event.ProgressListener;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.CannedAccessControlList;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.transfer.TransferManager;
+import com.amazonaws.services.s3.transfer.TransferManagerConfiguration;
+import com.amazonaws.services.s3.transfer.Upload;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.util.Progressable;
+
+import org.slf4j.Logger;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import static org.apache.hadoop.fs.s3a.Constants.*;
+
+public class S3AOutputStream extends OutputStream {
+  private OutputStream backupStream;
+  private File backupFile;
+  private boolean closed;
+  private String key;
+  private String bucket;
+  private AmazonS3Client client;
+  private Progressable progress;
+  private long partSize;
+  private int partSizeThreshold;
+  private S3AFileSystem fs;
+  private CannedAccessControlList cannedACL;
+  private FileSystem.Statistics statistics;
+  private LocalDirAllocator lDirAlloc;
+  private String serverSideEncryptionAlgorithm;
+
+  public static final Logger LOG = S3AFileSystem.LOG;
+
+  public S3AOutputStream(Configuration conf, AmazonS3Client client, 
+    S3AFileSystem fs, String bucket, String key, Progressable progress, 
+    CannedAccessControlList cannedACL, FileSystem.Statistics statistics, 
+    String serverSideEncryptionAlgorithm)
+      throws IOException {
+    this.bucket = bucket;
+    this.key = key;
+    this.client = client;
+    this.progress = progress;
+    this.fs = fs;
+    this.cannedACL = cannedACL;
+    this.statistics = statistics;
+    this.serverSideEncryptionAlgorithm = serverSideEncryptionAlgorithm;
+
+    partSize = conf.getLong(NEW_MULTIPART_SIZE, 
+      conf.getLong(OLD_MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE));
+    partSizeThreshold = conf.getInt(NEW_MIN_MULTIPART_THRESHOLD, 
+      conf.getInt(OLD_MIN_MULTIPART_THRESHOLD, 
DEFAULT_MIN_MULTIPART_THRESHOLD));
+
+    if (conf.get(BUFFER_DIR, null) != null) {
+      lDirAlloc = new LocalDirAllocator(BUFFER_DIR);
+    } else {
+      lDirAlloc = new LocalDirAllocator("${hadoop.tmp.dir}/s3a");
+    }
+
+    backupFile = lDirAlloc.createTmpFileForWrite("output-", 
LocalDirAllocator.SIZE_UNKNOWN, conf);
+    closed = false;
+
+    LOG.info("OutputStream for key '" + key + "' writing to tempfile: " + 
this.backupFile);
+
+    this.backupStream = new BufferedOutputStream(new 
FileOutputStream(backupFile));
+  }
+
+  @Override
+  public void flush() throws IOException {
+    backupStream.flush();
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    if (closed) {
+      return;
+    }
+
+    backupStream.close();
+    LOG.info("OutputStream for key '" + key + "' closed. Now beginning 
upload");
+    LOG.info("Minimum upload part size: " + partSize + " threshold " + 
partSizeThreshold);
+
+
+    try {
+      TransferManagerConfiguration transferConfiguration = new 
TransferManagerConfiguration();
+      transferConfiguration.setMinimumUploadPartSize(partSize);
+      transferConfiguration.setMultipartUploadThreshold(partSizeThreshold);
+
+      TransferManager transfers = new TransferManager(client);
+      transfers.setConfiguration(transferConfiguration);
+
+      final ObjectMetadata om = new ObjectMetadata();
+      if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
+        om.setServerSideEncryption(serverSideEncryptionAlgorithm);
+      }
+      PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, 
backupFile);
+      putObjectRequest.setCannedAcl(cannedACL);
+      putObjectRequest.setMetadata(om);
+
+      Upload upload = transfers.upload(putObjectRequest);
+
+      ProgressableProgressListener listener = 
+        new ProgressableProgressListener(upload, progress, statistics);
+      upload.addProgressListener(listener);
+
+      upload.waitForUploadResult();
+
+      long delta = upload.getProgress().getBytesTransferred() - 
listener.getLastBytesTransferred();
+      if (statistics != null && delta != 0) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("S3A write delta changed after finished: " + delta + " 
bytes");
+        }
+        statistics.incrementBytesWritten(delta);
+      }
+
+      // This will delete unnecessary fake parent directories
+      fs.finishedWrite(key);
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    } finally {
+      if (!backupFile.delete()) {
+        LOG.warn("Could not delete temporary s3a file: " + backupFile);
+      }
+      super.close();
+      closed = true;
+    }
+
+    LOG.info("OutputStream for key '" + key + "' upload complete");
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    backupStream.write(b);
+  }
+
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    backupStream.write(b, off, len);
+  }
+
+  public static class ProgressableProgressListener implements ProgressListener 
{
+    private Progressable progress;
+    private FileSystem.Statistics statistics;
+    private long lastBytesTransferred;
+    private Upload upload;
+
+    public ProgressableProgressListener(Upload upload, Progressable progress, 
+      FileSystem.Statistics statistics) {
+      this.upload = upload;
+      this.progress = progress;
+      this.statistics = statistics;
+      this.lastBytesTransferred = 0;
+    }
+
+    public void progressChanged(ProgressEvent progressEvent) {
+      if (progress != null) {
+        progress.progress();
+      }
+
+      // There are 3 http ops here, but this should be close enough for now
+      if (progressEvent.getEventCode() == 
ProgressEvent.PART_STARTED_EVENT_CODE ||
+          progressEvent.getEventCode() == ProgressEvent.COMPLETED_EVENT_CODE) {
+        statistics.incrementWriteOps(1);
+      }
+
+      long transferred = upload.getProgress().getBytesTransferred();
+      long delta = transferred - lastBytesTransferred;
+      if (statistics != null && delta != 0) {
+        statistics.incrementBytesWritten(delta);
+      }
+
+      lastBytesTransferred = transferred;
+    }
+
+    public long getLastBytesTransferred() {
+      return lastBytesTransferred;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/24d920b8/hadoop-tools/hadoop-aws/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
 
b/hadoop-tools/hadoop-aws/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
index 3cd1d6b..0e3c42a 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
+++ 
b/hadoop-tools/hadoop-aws/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
@@ -15,3 +15,4 @@
 
 org.apache.hadoop.fs.s3.S3FileSystem
 org.apache.hadoop.fs.s3native.NativeS3FileSystem
+org.apache.hadoop.fs.s3a.S3AFileSystem

http://git-wip-us.apache.org/repos/asf/hadoop/blob/24d920b8/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/S3AContract.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/S3AContract.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/S3AContract.java
new file mode 100644
index 0000000..cbdb3bd
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/S3AContract.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract.s3a;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractBondedFSContract;
+
+/**
+ * The contract of S3A: only enabled if the test bucket is provided
+ */
+public class S3AContract extends AbstractBondedFSContract {
+
+  public static final String CONTRACT_XML = "contract/s3a.xml";
+
+
+  public S3AContract(Configuration conf) {
+    super(conf);
+    //insert the base features
+    addConfResource(CONTRACT_XML);
+  }
+
+  @Override
+  public String getScheme() {
+    return "s3a";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/24d920b8/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractCreate.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractCreate.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractCreate.java
new file mode 100644
index 0000000..1d95ddf
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractCreate.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract.s3a;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractCreateTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+
+public class TestS3AContractCreate extends AbstractContractCreateTest {
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new S3AContract(conf);
+  }
+
+  @Override
+  public void testOverwriteEmptyDirectory() throws Throwable {
+    ContractTestUtils.skip(
+        "blobstores can't distinguish empty directories from files");
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/24d920b8/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractDelete.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractDelete.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractDelete.java
new file mode 100644
index 0000000..733a517
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractDelete.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract.s3a;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractDeleteTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+public class TestS3AContractDelete extends AbstractContractDeleteTest {
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new S3AContract(conf);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/24d920b8/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractMkdir.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractMkdir.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractMkdir.java
new file mode 100644
index 0000000..a312782
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractMkdir.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract.s3a;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractMkdirTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+/**
+ * Test dir operations on S3
+ */
+public class TestS3AContractMkdir extends AbstractContractMkdirTest {
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new S3AContract(conf);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/24d920b8/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractOpen.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractOpen.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractOpen.java
new file mode 100644
index 0000000..f735deb
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractOpen.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract.s3a;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractOpenTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+public class TestS3AContractOpen extends AbstractContractOpenTest {
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new S3AContract(conf);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/24d920b8/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractRename.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractRename.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractRename.java
new file mode 100644
index 0000000..88ed6d6
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractRename.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract.s3a;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractRenameTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.contract.AbstractFSContractTestBase;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
+
+public class TestS3AContractRename extends AbstractContractRenameTest {
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new S3AContract(conf);
+  }
+
+  @Override
+  public void testRenameDirIntoExistingDir() throws Throwable {
+    describe("Verify renaming a dir into an existing dir puts the files"
+             +" from the source dir into the existing dir"
+             +" and leaves existing files alone");
+    FileSystem fs = getFileSystem();
+    String sourceSubdir = "source";
+    Path srcDir = path(sourceSubdir);
+    Path srcFilePath = new Path(srcDir, "source-256.txt");
+    byte[] srcDataset = dataset(256, 'a', 'z');
+    writeDataset(fs, srcFilePath, srcDataset, srcDataset.length, 1024, false);
+    Path destDir = path("dest");
+
+    Path destFilePath = new Path(destDir, "dest-512.txt");
+    byte[] destDateset = dataset(512, 'A', 'Z');
+    writeDataset(fs, destFilePath, destDateset, destDateset.length, 1024, 
false);
+    assertIsFile(destFilePath);
+
+    boolean rename = fs.rename(srcDir, destDir);
+    Path renamedSrcFilePath = new Path(destDir, "source-256.txt");
+    assertIsFile(destFilePath);
+    assertIsFile(renamedSrcFilePath);
+    ContractTestUtils.verifyFileContents(fs, destFilePath, destDateset);
+    assertTrue("rename returned false though the contents were copied", 
rename);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/24d920b8/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractRootDir.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractRootDir.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractRootDir.java
new file mode 100644
index 0000000..5e2352c
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractRootDir.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract.s3a;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractRootDirectoryTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+/**
+ * root dir operations against an S3 bucket
+ */
+public class TestS3AContractRootDir extends
+    AbstractContractRootDirectoryTest {
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new S3AContract(conf);
+  }
+}

Reply via email to