HADOOP-14999. AliyunOSS: provide one asynchronous multi-part based uploading 
mechanism. Contributed by Genmao Yu.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6542d17e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6542d17e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6542d17e

Branch: refs/heads/HDFS-12943
Commit: 6542d17ea460ec222137c4b275b13daf15d3fca3
Parents: 2216bde
Author: Sammi Chen <sammi.c...@intel.com>
Authored: Fri Mar 30 20:23:05 2018 +0800
Committer: Sammi Chen <sammi.c...@intel.com>
Committed: Fri Mar 30 20:23:05 2018 +0800

----------------------------------------------------------------------
 .../aliyun/oss/AliyunCredentialsProvider.java   |   3 +-
 .../aliyun/oss/AliyunOSSBlockOutputStream.java  | 206 +++++++++++++++++++
 .../fs/aliyun/oss/AliyunOSSFileSystem.java      |  34 ++-
 .../fs/aliyun/oss/AliyunOSSFileSystemStore.java | 173 ++++++++--------
 .../fs/aliyun/oss/AliyunOSSOutputStream.java    | 111 ----------
 .../hadoop/fs/aliyun/oss/AliyunOSSUtils.java    | 115 ++++++++---
 .../apache/hadoop/fs/aliyun/oss/Constants.java  |  22 +-
 .../oss/TestAliyunOSSBlockOutputStream.java     | 115 +++++++++++
 .../fs/aliyun/oss/TestAliyunOSSInputStream.java |  10 +-
 .../aliyun/oss/TestAliyunOSSOutputStream.java   |  91 --------
 .../contract/TestAliyunOSSContractDistCp.java   |   2 +-
 11 files changed, 544 insertions(+), 338 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/6542d17e/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunCredentialsProvider.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunCredentialsProvider.java
 
b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunCredentialsProvider.java
index b46c67a..58c14a9 100644
--- 
a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunCredentialsProvider.java
+++ 
b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunCredentialsProvider.java
@@ -35,8 +35,7 @@ import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
 public class AliyunCredentialsProvider implements CredentialsProvider {
   private Credentials credentials = null;
 
-  public AliyunCredentialsProvider(Configuration conf)
-      throws IOException {
+  public AliyunCredentialsProvider(Configuration conf) throws IOException {
     String accessKeyId;
     String accessKeySecret;
     String securityToken;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6542d17e/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSBlockOutputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSBlockOutputStream.java
 
b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSBlockOutputStream.java
new file mode 100644
index 0000000..12d551b
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSBlockOutputStream.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.aliyun.oss;
+
+import com.aliyun.oss.model.PartETag;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Asynchronous multi-part based uploading mechanism to support huge file
+ * which is larger than 5GB. Data will be buffered on local disk, then uploaded
+ * to OSS in {@link #close()} method.
+ */
+public class AliyunOSSBlockOutputStream extends OutputStream {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(AliyunOSSBlockOutputStream.class);
+  private AliyunOSSFileSystemStore store;
+  private Configuration conf;
+  private boolean closed;
+  private String key;
+  private File blockFile;
+  private List<File> blockFiles = new ArrayList<>();
+  private long blockSize;
+  private int blockId = 0;
+  private long blockWritten = 0L;
+  private String uploadId = null;
+  private final List<ListenableFuture<PartETag>> partETagsFutures;
+  private final ListeningExecutorService executorService;
+  private OutputStream blockStream;
+  private final byte[] singleByte = new byte[1];
+
+  public AliyunOSSBlockOutputStream(Configuration conf,
+      AliyunOSSFileSystemStore store,
+      String key,
+      Long blockSize,
+      ExecutorService executorService) throws IOException {
+    this.store = store;
+    this.conf = conf;
+    this.key = key;
+    this.blockSize = blockSize;
+    this.blockFile = newBlockFile();
+    this.blockStream =
+        new BufferedOutputStream(new FileOutputStream(blockFile));
+    this.partETagsFutures = new ArrayList<>(2);
+    this.executorService = MoreExecutors.listeningDecorator(executorService);
+  }
+
+  private File newBlockFile() throws IOException {
+    return AliyunOSSUtils.createTmpFileForWrite(
+        String.format("oss-block-%04d-", blockId), blockSize, conf);
+  }
+
+  @Override
+  public synchronized void flush() throws IOException {
+    blockStream.flush();
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    if (closed) {
+      return;
+    }
+
+    blockStream.flush();
+    blockStream.close();
+    if (!blockFiles.contains(blockFile)) {
+      blockFiles.add(blockFile);
+    }
+
+    try {
+      if (blockFiles.size() == 1) {
+        // just upload it directly
+        store.uploadObject(key, blockFile);
+      } else {
+        if (blockWritten > 0) {
+          ListenableFuture<PartETag> partETagFuture =
+              executorService.submit(() -> {
+                PartETag partETag = store.uploadPart(blockFile, key, uploadId,
+                    blockId + 1);
+                return partETag;
+              });
+          partETagsFutures.add(partETagFuture);
+        }
+        // wait for the partial uploads to finish
+        final List<PartETag> partETags = waitForAllPartUploads();
+        if (null == partETags) {
+          throw new IOException("Failed to multipart upload to oss, abort 
it.");
+        }
+        store.completeMultipartUpload(key, uploadId, partETags);
+      }
+    } finally {
+      for (File tFile: blockFiles) {
+        if (tFile.exists() && !tFile.delete()) {
+          LOG.warn("Failed to delete temporary file {}", tFile);
+        }
+      }
+      closed = true;
+    }
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    singleByte[0] = (byte)b;
+    write(singleByte, 0, 1);
+  }
+
+  @Override
+  public synchronized void write(byte[] b, int off, int len)
+      throws IOException {
+    if (closed) {
+      throw new IOException("Stream closed.");
+    }
+    try {
+      blockStream.write(b, off, len);
+      blockWritten += len;
+      if (blockWritten >= blockSize) {
+        uploadCurrentPart();
+        blockWritten = 0L;
+      }
+    } finally {
+      for (File tFile: blockFiles) {
+        if (tFile.exists() && !tFile.delete()) {
+          LOG.warn("Failed to delete temporary file {}", tFile);
+        }
+      }
+    }
+  }
+
+  private void uploadCurrentPart() throws IOException {
+    blockFiles.add(blockFile);
+    blockStream.flush();
+    blockStream.close();
+    if (blockId == 0) {
+      uploadId = store.getUploadId(key);
+    }
+    ListenableFuture<PartETag> partETagFuture =
+        executorService.submit(() -> {
+          PartETag partETag = store.uploadPart(blockFile, key, uploadId,
+              blockId + 1);
+          return partETag;
+        });
+    partETagsFutures.add(partETagFuture);
+    blockFile = newBlockFile();
+    blockId++;
+    blockStream = new BufferedOutputStream(new FileOutputStream(blockFile));
+  }
+
+  /**
+   * Block awaiting all outstanding uploads to complete.
+   * @return list of results
+   * @throws IOException IO Problems
+   */
+  private List<PartETag> waitForAllPartUploads() throws IOException {
+    LOG.debug("Waiting for {} uploads to complete", partETagsFutures.size());
+    try {
+      return Futures.allAsList(partETagsFutures).get();
+    } catch (InterruptedException ie) {
+      LOG.warn("Interrupted partUpload", ie);
+      Thread.currentThread().interrupt();
+      return null;
+    } catch (ExecutionException ee) {
+      //there is no way of recovering so abort
+      //cancel all partUploads
+      LOG.debug("While waiting for upload completion", ee);
+      LOG.debug("Cancelling futures");
+      for (ListenableFuture<PartETag> future : partETagsFutures) {
+        future.cancel(true);
+      }
+      //abort multipartupload
+      store.abortMultipartUpload(key, uploadId);
+      throw new IOException("Multi-part upload with id '" + uploadId
+        + "' to " + key, ee);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6542d17e/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java
 
b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java
index b3c63d3..93e31d5 100644
--- 
a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java
+++ 
b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java
@@ -56,6 +56,8 @@ import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.hadoop.fs.aliyun.oss.AliyunOSSUtils.intOption;
+import static org.apache.hadoop.fs.aliyun.oss.AliyunOSSUtils.longOption;
 import static 
org.apache.hadoop.fs.aliyun.oss.AliyunOSSUtils.objectRepresentsDirectory;
 import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
 
@@ -69,6 +71,7 @@ public class AliyunOSSFileSystem extends FileSystem {
   private URI uri;
   private String bucket;
   private Path workingDir;
+  private int blockOutputActiveBlocks;
   private AliyunOSSFileSystemStore store;
   private int maxKeys;
   private int maxReadAheadPartNumber;
@@ -125,8 +128,15 @@ public class AliyunOSSFileSystem extends FileSystem {
       // this means the file is not found
     }
 
-    return new FSDataOutputStream(new AliyunOSSOutputStream(getConf(),
-        store, key, progress, statistics), (Statistics)(null));
+    long uploadPartSize = AliyunOSSUtils.getMultipartSizeProperty(getConf(),
+        MULTIPART_UPLOAD_PART_SIZE_KEY, MULTIPART_UPLOAD_PART_SIZE_DEFAULT);
+    return new FSDataOutputStream(
+        new AliyunOSSBlockOutputStream(getConf(),
+            store,
+            key,
+            uploadPartSize,
+            new SemaphoredDelegatingExecutor(boundedThreadPool,
+                blockOutputActiveBlocks, true)), (Statistics)(null));
   }
 
   /**
@@ -149,9 +159,8 @@ public class AliyunOSSFileSystem extends FileSystem {
         throw new FileAlreadyExistsException("Not a directory: " + parent);
       }
     }
-    return create(path, permission,
-      flags.contains(CreateFlag.OVERWRITE), bufferSize,
-      replication, blockSize, progress);
+    return create(path, permission, flags.contains(CreateFlag.OVERWRITE),
+        bufferSize, replication, blockSize, progress);
   }
 
   @Override
@@ -270,7 +279,7 @@ public class AliyunOSSFileSystem extends FileSystem {
       }
     } else if (objectRepresentsDirectory(key, meta.getContentLength())) {
       return new FileStatus(0, true, 1, 0, meta.getLastModified().getTime(),
-           qualifiedPath);
+          qualifiedPath);
     } else {
       return new FileStatus(meta.getContentLength(), false, 1,
           getDefaultBlockSize(path), meta.getLastModified().getTime(),
@@ -318,6 +327,10 @@ public class AliyunOSSFileSystem extends FileSystem {
     uri = java.net.URI.create(name.getScheme() + "://" + name.getAuthority());
     workingDir = new Path("/user",
         System.getProperty("user.name")).makeQualified(uri, null);
+    long keepAliveTime = longOption(conf,
+        KEEPALIVE_TIME_KEY, KEEPALIVE_TIME_DEFAULT, 0);
+    blockOutputActiveBlocks = intOption(conf,
+        UPLOAD_ACTIVE_BLOCKS_KEY, UPLOAD_ACTIVE_BLOCKS_DEFAULT, 1);
 
     store = new AliyunOSSFileSystemStore();
     store.initialize(name, conf, statistics);
@@ -335,7 +348,8 @@ public class AliyunOSSFileSystem extends FileSystem {
         Constants.MULTIPART_DOWNLOAD_AHEAD_PART_MAX_NUM_DEFAULT);
 
     this.boundedThreadPool = BlockingThreadPoolExecutorService.newInstance(
-        threadNum, totalTasks, 60L, TimeUnit.SECONDS, "oss-read-shared");
+        threadNum, totalTasks, keepAliveTime, TimeUnit.SECONDS,
+        "oss-transfer-shared");
 
     maxConcurrentCopyTasksPerDir = AliyunOSSUtils.intPositiveOption(conf,
         Constants.MAX_CONCURRENT_COPY_TASKS_PER_DIR_KEY,
@@ -490,12 +504,12 @@ public class AliyunOSSFileSystem extends FileSystem {
     if (status.isFile()) {
       LOG.debug("{} is a File", qualifiedPath);
       final BlockLocation[] locations = getFileBlockLocations(status,
-        0, status.getLen());
+          0, status.getLen());
       return store.singleStatusRemoteIterator(filter.accept(f) ? status : null,
-        locations);
+          locations);
     } else {
       return store.createLocatedFileStatusIterator(key, maxKeys, this, filter,
-        acceptor, recursive ? null : "/");
+          acceptor, recursive ? null : "/");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6542d17e/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java
 
b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java
index a7f13c0..cc050c8 100644
--- 
a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java
+++ 
b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.hadoop.fs.aliyun.oss;
 
 import com.aliyun.oss.ClientConfiguration;
@@ -62,8 +63,11 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.Serializable;
 import java.net.URI;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.NoSuchElementException;
@@ -83,7 +87,6 @@ public class AliyunOSSFileSystemStore {
   private String bucketName;
   private long uploadPartSize;
   private long multipartThreshold;
-  private long partSize;
   private int maxKeys;
   private String serverSideEncryptionAlgorithm;
 
@@ -143,28 +146,18 @@ public class AliyunOSSFileSystemStore {
     String endPoint = conf.getTrimmed(ENDPOINT_KEY, "");
     if (StringUtils.isEmpty(endPoint)) {
       throw new IllegalArgumentException("Aliyun OSS endpoint should not be " +
-        "null or empty. Please set proper endpoint with 'fs.oss.endpoint'.");
+          "null or empty. Please set proper endpoint with 'fs.oss.endpoint'.");
     }
     CredentialsProvider provider =
         AliyunOSSUtils.getCredentialsProvider(conf);
     ossClient = new OSSClient(endPoint, provider, clientConf);
-    uploadPartSize = conf.getLong(MULTIPART_UPLOAD_SIZE_KEY,
-        MULTIPART_UPLOAD_SIZE_DEFAULT);
+    uploadPartSize = AliyunOSSUtils.getMultipartSizeProperty(conf,
+        MULTIPART_UPLOAD_PART_SIZE_KEY, MULTIPART_UPLOAD_PART_SIZE_DEFAULT);
     multipartThreshold = conf.getLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY,
         MIN_MULTIPART_UPLOAD_THRESHOLD_DEFAULT);
-    partSize = conf.getLong(MULTIPART_UPLOAD_SIZE_KEY,
-        MULTIPART_UPLOAD_SIZE_DEFAULT);
-    if (partSize < MIN_MULTIPART_UPLOAD_PART_SIZE) {
-      partSize = MIN_MULTIPART_UPLOAD_PART_SIZE;
-    }
     serverSideEncryptionAlgorithm =
         conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM_KEY, "");
 
-    if (uploadPartSize < 5 * 1024 * 1024) {
-      LOG.warn(MULTIPART_UPLOAD_SIZE_KEY + " must be at least 5 MB");
-      uploadPartSize = 5 * 1024 * 1024;
-    }
-
     if (multipartThreshold < 5 * 1024 * 1024) {
       LOG.warn(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY + " must be at least 5 MB");
       multipartThreshold = 5 * 1024 * 1024;
@@ -420,71 +413,6 @@ public class AliyunOSSFileSystemStore {
   }
 
   /**
-   * Upload a file as an OSS object, using multipart upload.
-   *
-   * @param key object key.
-   * @param file local file to upload.
-   * @throws IOException if failed to upload object.
-   */
-  public void multipartUploadObject(String key, File file) throws IOException {
-    File object = file.getAbsoluteFile();
-    long dataLen = object.length();
-    long realPartSize = AliyunOSSUtils.calculatePartSize(dataLen, partSize);
-    int partNum = (int) (dataLen / realPartSize);
-    if (dataLen % realPartSize != 0) {
-      partNum += 1;
-    }
-
-    InitiateMultipartUploadRequest initiateMultipartUploadRequest =
-        new InitiateMultipartUploadRequest(bucketName, key);
-    ObjectMetadata meta = new ObjectMetadata();
-    if (StringUtils.isNotEmpty(serverSideEncryptionAlgorithm)) {
-      meta.setServerSideEncryption(serverSideEncryptionAlgorithm);
-    }
-    initiateMultipartUploadRequest.setObjectMetadata(meta);
-    InitiateMultipartUploadResult initiateMultipartUploadResult =
-        ossClient.initiateMultipartUpload(initiateMultipartUploadRequest);
-    List<PartETag> partETags = new ArrayList<PartETag>();
-    String uploadId = initiateMultipartUploadResult.getUploadId();
-
-    try {
-      for (int i = 0; i < partNum; i++) {
-        // TODO: Optimize this, avoid opening the object multiple times
-        FileInputStream fis = new FileInputStream(object);
-        try {
-          long skipBytes = realPartSize * i;
-          AliyunOSSUtils.skipFully(fis, skipBytes);
-          long size = (realPartSize < dataLen - skipBytes) ?
-              realPartSize : dataLen - skipBytes;
-          UploadPartRequest uploadPartRequest = new UploadPartRequest();
-          uploadPartRequest.setBucketName(bucketName);
-          uploadPartRequest.setKey(key);
-          uploadPartRequest.setUploadId(uploadId);
-          uploadPartRequest.setInputStream(fis);
-          uploadPartRequest.setPartSize(size);
-          uploadPartRequest.setPartNumber(i + 1);
-          UploadPartResult uploadPartResult =
-              ossClient.uploadPart(uploadPartRequest);
-          statistics.incrementWriteOps(1);
-          partETags.add(uploadPartResult.getPartETag());
-        } finally {
-          fis.close();
-        }
-      }
-      CompleteMultipartUploadRequest completeMultipartUploadRequest =
-          new CompleteMultipartUploadRequest(bucketName, key,
-              uploadId, partETags);
-      CompleteMultipartUploadResult completeMultipartUploadResult =
-          ossClient.completeMultipartUpload(completeMultipartUploadRequest);
-      LOG.debug(completeMultipartUploadResult.getETag());
-    } catch (OSSException | ClientException e) {
-      AbortMultipartUploadRequest abortMultipartUploadRequest =
-          new AbortMultipartUploadRequest(bucketName, key, uploadId);
-      ossClient.abortMultipartUpload(abortMultipartUploadRequest);
-    }
-  }
-
-  /**
    * list objects.
    *
    * @param prefix prefix.
@@ -494,7 +422,7 @@ public class AliyunOSSFileSystemStore {
    * @return a list of matches.
    */
   public ObjectListing listObjects(String prefix, int maxListingLength,
-                                   String marker, boolean recursive) {
+      String marker, boolean recursive) {
     String delimiter = recursive ? null : "/";
     prefix = AliyunOSSUtils.maybeAddTrailingSlash(prefix);
     ListObjectsRequest listRequest = new ListObjectsRequest(bucketName);
@@ -605,7 +533,7 @@ public class AliyunOSSFileSystemStore {
         if (hasNext()) {
           FileStatus status = batchIterator.next();
           BlockLocation[] locations = fs.getFileBlockLocations(status,
-            0, status.getLen());
+              0, status.getLen());
           return new LocatedFileStatus(
               status, status.isFile() ? locations : null);
         } else {
@@ -626,7 +554,7 @@ public class AliyunOSSFileSystemStore {
         List<FileStatus> stats = new ArrayList<>(
             listing.getObjectSummaries().size() +
             listing.getCommonPrefixes().size());
-        for(OSSObjectSummary summary: listing.getObjectSummaries()) {
+        for (OSSObjectSummary summary : listing.getObjectSummaries()) {
           String key = summary.getKey();
           Path path = fs.makeQualified(new Path("/" + key));
           if (filter.accept(path) && acceptor.accept(path, summary)) {
@@ -637,7 +565,7 @@ public class AliyunOSSFileSystemStore {
           }
         }
 
-        for(String commonPrefix: listing.getCommonPrefixes()) {
+        for (String commonPrefix : listing.getCommonPrefixes()) {
           Path path = fs.makeQualified(new Path("/" + commonPrefix));
           if (filter.accept(path) && acceptor.accept(path, commonPrefix)) {
             FileStatus status = new FileStatus(0, true, 1, 0, 0, path);
@@ -656,4 +584,83 @@ public class AliyunOSSFileSystemStore {
       }
     };
   }
+
+  public PartETag uploadPart(File file, String key, String uploadId, int idx)
+      throws IOException {
+    InputStream instream = null;
+    Exception caught = null;
+    int tries = 3;
+    while (tries > 0) {
+      try {
+        instream = new FileInputStream(file);
+        UploadPartRequest uploadRequest = new UploadPartRequest();
+        uploadRequest.setBucketName(bucketName);
+        uploadRequest.setKey(key);
+        uploadRequest.setUploadId(uploadId);
+        uploadRequest.setInputStream(instream);
+        uploadRequest.setPartSize(file.length());
+        uploadRequest.setPartNumber(idx);
+        UploadPartResult uploadResult = ossClient.uploadPart(uploadRequest);
+        return uploadResult.getPartETag();
+      } catch (Exception e) {
+        LOG.debug("Failed to upload "+ file.getPath() +", " +
+            "try again.", e);
+        caught = e;
+      } finally {
+        if (instream != null) {
+          instream.close();
+          instream = null;
+        }
+      }
+      tries--;
+    }
+
+    assert (caught != null);
+    throw new IOException("Failed to upload " + file.getPath() +
+        " for 3 times.", caught);
+  }
+
+  /**
+   * Initiate multipart upload.
+   */
+  public String getUploadId(String key) {
+    InitiateMultipartUploadRequest initiateMultipartUploadRequest =
+        new InitiateMultipartUploadRequest(bucketName, key);
+    InitiateMultipartUploadResult initiateMultipartUploadResult =
+        ossClient.initiateMultipartUpload(initiateMultipartUploadRequest);
+    return initiateMultipartUploadResult.getUploadId();
+  }
+
+  /**
+   * Complete the specific multipart upload.
+   */
+  public CompleteMultipartUploadResult completeMultipartUpload(String key,
+      String uploadId, List<PartETag> partETags) {
+    Collections.sort(partETags, new PartNumberAscendComparator());
+    CompleteMultipartUploadRequest completeMultipartUploadRequest =
+        new CompleteMultipartUploadRequest(bucketName, key, uploadId,
+            partETags);
+    return ossClient.completeMultipartUpload(completeMultipartUploadRequest);
+  }
+
+  /**
+   * Abort the specific multipart upload.
+   */
+  public void abortMultipartUpload(String key, String uploadId) {
+    AbortMultipartUploadRequest request = new AbortMultipartUploadRequest(
+        bucketName, key, uploadId);
+    ossClient.abortMultipartUpload(request);
+  }
+
+  private static class PartNumberAscendComparator
+      implements Comparator<PartETag>, Serializable {
+    @Override
+    public int compare(PartETag o1, PartETag o2) {
+      if (o1.getPartNumber() > o2.getPartNumber()) {
+        return 1;
+      } else {
+        return -1;
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6542d17e/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSOutputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSOutputStream.java
 
b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSOutputStream.java
deleted file mode 100644
index c75ee18..0000000
--- 
a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSOutputStream.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.aliyun.oss;
-
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem.Statistics;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.util.Progressable;
-
-import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
-
-/**
- * The output stream for OSS blob system.
- * Data will be buffered on local disk, then uploaded to OSS in
- * {@link #close()} method.
- */
-public class AliyunOSSOutputStream extends OutputStream {
-  public static final Log LOG = LogFactory.getLog(AliyunOSSOutputStream.class);
-  private AliyunOSSFileSystemStore store;
-  private final String key;
-  private Statistics statistics;
-  private Progressable progress;
-  private long partSizeThreshold;
-  private LocalDirAllocator dirAlloc;
-  private boolean closed;
-  private File tmpFile;
-  private BufferedOutputStream backupStream;
-
-  public AliyunOSSOutputStream(Configuration conf,
-      AliyunOSSFileSystemStore store, String key, Progressable progress,
-      Statistics statistics) throws IOException {
-    this.store = store;
-    this.key = key;
-    // The caller cann't get any progress information
-    this.progress = progress;
-    this.statistics = statistics;
-    partSizeThreshold = conf.getLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY,
-        MIN_MULTIPART_UPLOAD_THRESHOLD_DEFAULT);
-
-    if (conf.get(BUFFER_DIR_KEY) == null) {
-      conf.set(BUFFER_DIR_KEY, conf.get("hadoop.tmp.dir") + "/oss");
-    }
-    dirAlloc = new LocalDirAllocator(BUFFER_DIR_KEY);
-
-    tmpFile = dirAlloc.createTmpFileForWrite("output-",
-        LocalDirAllocator.SIZE_UNKNOWN, conf);
-    backupStream = new BufferedOutputStream(new FileOutputStream(tmpFile));
-    closed = false;
-  }
-
-  @Override
-  public synchronized void close() throws IOException {
-    if (closed) {
-      return;
-    }
-    closed = true;
-    if (backupStream != null) {
-      backupStream.close();
-    }
-    long dataLen = tmpFile.length();
-    try {
-      if (dataLen <= partSizeThreshold) {
-        store.uploadObject(key, tmpFile);
-      } else {
-        store.multipartUploadObject(key, tmpFile);
-      }
-    } finally {
-      if (!tmpFile.delete()) {
-        LOG.warn("Can not delete file: " + tmpFile);
-      }
-    }
-  }
-
-
-
-  @Override
-  public synchronized void flush() throws IOException {
-    backupStream.flush();
-  }
-
-  @Override
-  public synchronized void write(int b) throws IOException {
-    backupStream.write(b);
-    statistics.incrementBytesWritten(1);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6542d17e/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java
 
b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java
index 1a21608..2fe06c1 100644
--- 
a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java
+++ 
b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java
@@ -18,12 +18,14 @@
 
 package org.apache.hadoop.fs.aliyun.oss;
 
+import java.io.File;
 import java.io.IOException;
-import java.io.InputStream;
 
 import com.aliyun.oss.common.auth.CredentialsProvider;
+import com.google.common.base.Preconditions;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.security.ProviderUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,6 +38,7 @@ import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
 final public class AliyunOSSUtils {
   private static final Logger LOG =
       LoggerFactory.getLogger(AliyunOSSUtils.class);
+  private static LocalDirAllocator directoryAllocator;
 
   private AliyunOSSUtils() {
   }
@@ -75,31 +78,6 @@ final public class AliyunOSSUtils {
   }
 
   /**
-   * Skip the requested number of bytes or fail if there are no enough bytes
-   * left. This allows for the possibility that {@link InputStream#skip(long)}
-   * may not skip as many bytes as requested (most likely because of reaching
-   * EOF).
-   *
-   * @param is the input stream to skip.
-   * @param n the number of bytes to skip.
-   * @throws IOException thrown when skipped less number of bytes.
-   */
-  public static void skipFully(InputStream is, long n) throws IOException {
-    long total = 0;
-    long cur = 0;
-
-    do {
-      cur = is.skip(n - total);
-      total += cur;
-    } while((total < n) && (cur > 0));
-
-    if (total < n) {
-      throw new IOException("Failed to skip " + n + " bytes, possibly due " +
-              "to EOF.");
-    }
-  }
-
-  /**
    * Calculate a proper size of multipart piece. If <code>minPartSize</code>
    * is too small, the number of multipart pieces may exceed the limit of
    * {@link Constants#MULTIPART_UPLOAD_PART_NUM_LIMIT}.
@@ -126,7 +104,7 @@ final public class AliyunOSSUtils {
       throws IOException {
     CredentialsProvider credentials;
 
-    String className = conf.getTrimmed(ALIYUN_OSS_CREDENTIALS_PROVIDER_KEY);
+    String className = conf.getTrimmed(CREDENTIALS_PROVIDER_KEY);
     if (StringUtils.isEmpty(className)) {
       Configuration newConf =
           ProviderUtils.excludeIncompatibleCredentialProviders(conf,
@@ -151,7 +129,7 @@ final public class AliyunOSSUtils {
         throw new IOException(String.format("%s constructor exception.  A " +
             "class specified in %s must provide an accessible constructor " +
             "accepting URI and Configuration, or an accessible default " +
-            "constructor.", className, ALIYUN_OSS_CREDENTIALS_PROVIDER_KEY),
+            "constructor.", className, CREDENTIALS_PROVIDER_KEY),
             e);
       } catch (ReflectiveOperationException | IllegalArgumentException e) {
         throw new IOException(className + " instantiation exception.", e);
@@ -188,4 +166,85 @@ final public class AliyunOSSUtils {
       final long size) {
     return StringUtils.isNotEmpty(name) && name.endsWith("/") && size == 0L;
   }
+
+  /**
+   * Demand create the directory allocator, then create a temporary file.
+   *  @param path prefix for the temporary file
+   *  @param size the size of the file that is going to be written
+   *  @param conf the Configuration object
+   *  @return a unique temporary file
+   *  @throws IOException IO problems
+   */
+  public static File createTmpFileForWrite(String path, long size,
+      Configuration conf) throws IOException {
+    if (conf.get(BUFFER_DIR_KEY) == null) {
+      conf.set(BUFFER_DIR_KEY, conf.get("hadoop.tmp.dir") + "/oss");
+    }
+    if (directoryAllocator == null) {
+      directoryAllocator = new LocalDirAllocator(BUFFER_DIR_KEY);
+    }
+    return directoryAllocator.createTmpFileForWrite(path, size, conf);
+  }
+
+  /**
+   * Get a integer option >= the minimum allowed value.
+   * @param conf configuration
+   * @param key key to look up
+   * @param defVal default value
+   * @param min minimum value
+   * @return the value
+   * @throws IllegalArgumentException if the value is below the minimum
+   */
+  static int intOption(Configuration conf, String key, int defVal, int min) {
+    int v = conf.getInt(key, defVal);
+    Preconditions.checkArgument(v >= min,
+        String.format("Value of %s: %d is below the minimum value %d",
+            key, v, min));
+    LOG.debug("Value of {} is {}", key, v);
+    return v;
+  }
+
+  /**
+   * Get a long option >= the minimum allowed value.
+   * @param conf configuration
+   * @param key key to look up
+   * @param defVal default value
+   * @param min minimum value
+   * @return the value
+   * @throws IllegalArgumentException if the value is below the minimum
+   */
+  static long longOption(Configuration conf, String key, long defVal,
+      long min) {
+    long v = conf.getLong(key, defVal);
+    Preconditions.checkArgument(v >= min,
+        String.format("Value of %s: %d is below the minimum value %d",
+            key, v, min));
+    LOG.debug("Value of {} is {}", key, v);
+    return v;
+  }
+
+  /**
+   * Get a size property from the configuration: this property must
+   * be at least equal to {@link Constants#MULTIPART_MIN_SIZE}.
+   * If it is too small, it is rounded up to that minimum, and a warning
+   * printed.
+   * @param conf configuration
+   * @param property property name
+   * @param defVal default value
+   * @return the value, guaranteed to be above the minimum size
+   */
+  public static long getMultipartSizeProperty(Configuration conf,
+      String property, long defVal) {
+    long partSize = conf.getLong(property, defVal);
+    if (partSize < MULTIPART_MIN_SIZE) {
+      LOG.warn("{} must be at least 100 KB; configured value is {}",
+          property, partSize);
+      partSize = MULTIPART_MIN_SIZE;
+    } else if (partSize > Integer.MAX_VALUE) {
+      LOG.warn("oss: {} capped to ~2.14GB(maximum allowed size with " +
+          "current output mechanism)", MULTIPART_UPLOAD_PART_SIZE_KEY);
+      partSize = Integer.MAX_VALUE;
+    }
+    return partSize;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6542d17e/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java
 
b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java
index 283927c..ecbd749 100644
--- 
a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java
+++ 
b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java
@@ -31,10 +31,10 @@ public final class Constants {
   // User agent
   public static final String USER_AGENT_PREFIX = "fs.oss.user.agent.prefix";
   public static final String USER_AGENT_PREFIX_DEFAULT =
-          VersionInfoUtils.getDefaultUserAgent();
+      VersionInfoUtils.getDefaultUserAgent();
 
   // Class of credential provider
-  public static final String ALIYUN_OSS_CREDENTIALS_PROVIDER_KEY =
+  public static final String CREDENTIALS_PROVIDER_KEY =
       "fs.oss.credentials.provider";
 
   // OSS access verification
@@ -82,10 +82,14 @@ public final class Constants {
   public static final int MAX_PAGING_KEYS_DEFAULT = 1000;
 
   // Size of each of or multipart pieces in bytes
-  public static final String MULTIPART_UPLOAD_SIZE_KEY =
+  public static final String MULTIPART_UPLOAD_PART_SIZE_KEY =
       "fs.oss.multipart.upload.size";
+  public static final long MULTIPART_UPLOAD_PART_SIZE_DEFAULT =
+      104857600; // 100 MB
+
+  /** The minimum multipart size which Aliyun OSS supports. */
+  public static final int MULTIPART_MIN_SIZE = 100 * 1024;
 
-  public static final long MULTIPART_UPLOAD_SIZE_DEFAULT = 10 * 1024 * 1024;
   public static final int MULTIPART_UPLOAD_PART_NUM_LIMIT = 10000;
 
   // Minimum size in bytes before we start a multipart uploads or copy
@@ -96,7 +100,6 @@ public final class Constants {
 
   public static final String MULTIPART_DOWNLOAD_SIZE_KEY =
       "fs.oss.multipart.download.size";
-
   public static final long MULTIPART_DOWNLOAD_SIZE_DEFAULT = 512 * 1024;
 
   public static final String MULTIPART_DOWNLOAD_THREAD_NUMBER_KEY =
@@ -139,9 +142,14 @@ public final class Constants {
 
   public static final String FS_OSS_BLOCK_SIZE_KEY = "fs.oss.block.size";
   public static final int FS_OSS_BLOCK_SIZE_DEFAULT = 64 * 1024 * 1024;
+
   public static final String FS_OSS = "oss";
 
-  public static final long MIN_MULTIPART_UPLOAD_PART_SIZE = 100 * 1024L;
-  public static final int MAX_RETRIES = 10;
+  public static final String KEEPALIVE_TIME_KEY =
+      "fs.oss.threads.keepalivetime";
+  public static final int KEEPALIVE_TIME_DEFAULT = 60;
 
+  public static final String UPLOAD_ACTIVE_BLOCKS_KEY =
+      "fs.oss.upload.active.blocks";
+  public static final int UPLOAD_ACTIVE_BLOCKS_DEFAULT = 4;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6542d17e/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java
 
b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java
new file mode 100644
index 0000000..365d931
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.aliyun.oss;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+
+import static 
org.apache.hadoop.fs.aliyun.oss.Constants.MULTIPART_UPLOAD_PART_SIZE_DEFAULT;
+
+/**
+ * Tests regular and multi-part upload functionality for
+ * AliyunOSSBlockOutputStream.
+ */
+public class TestAliyunOSSBlockOutputStream {
+  private FileSystem fs;
+  private static String testRootPath =
+      AliyunOSSTestUtils.generateUniqueTestPath();
+
+  @Rule
+  public Timeout testTimeout = new Timeout(30 * 60 * 1000);
+
+  @Before
+  public void setUp() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setLong(Constants.MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, 5 * 1024 * 
1024);
+    conf.setInt(Constants.MULTIPART_UPLOAD_PART_SIZE_KEY, 5 * 1024 * 1024);
+    fs = AliyunOSSTestUtils.createTestFileSystem(conf);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (fs != null) {
+      fs.delete(new Path(testRootPath), true);
+    }
+  }
+
+  private Path getTestPath() {
+    return new Path(testRootPath + "/test-aliyun-oss");
+  }
+
+  @Test
+  public void testZeroByteUpload() throws IOException {
+    ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 0);
+  }
+
+  @Test
+  public void testRegularUpload() throws IOException {
+    ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 1024 * 1024 - 1);
+    ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 1024 * 1024);
+    ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 1024 * 1024 + 1);
+  }
+
+  @Test
+  public void testMultiPartUpload() throws IOException {
+    ContractTestUtils.createAndVerifyFile(fs, getTestPath(),
+        6 * 1024 * 1024 - 1);
+    ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 6 * 1024 * 1024);
+    ContractTestUtils.createAndVerifyFile(fs, getTestPath(),
+        6 * 1024 * 1024 + 1);
+  }
+
+  @Test
+  public void testHugeUpload() throws IOException {
+    ContractTestUtils.createAndVerifyFile(fs, getTestPath(),
+        MULTIPART_UPLOAD_PART_SIZE_DEFAULT - 1);
+    ContractTestUtils.createAndVerifyFile(fs, getTestPath(),
+        MULTIPART_UPLOAD_PART_SIZE_DEFAULT);
+    ContractTestUtils.createAndVerifyFile(fs, getTestPath(),
+        MULTIPART_UPLOAD_PART_SIZE_DEFAULT + 1);
+  }
+
+  @Test
+  public void testMultiPartUploadLimit() throws IOException {
+    long partSize1 = AliyunOSSUtils.calculatePartSize(10 * 1024, 100 * 1024);
+    assert(10 * 1024 / partSize1 < Constants.MULTIPART_UPLOAD_PART_NUM_LIMIT);
+
+    long partSize2 = AliyunOSSUtils.calculatePartSize(200 * 1024, 100 * 1024);
+    assert(200 * 1024 / partSize2 < Constants.MULTIPART_UPLOAD_PART_NUM_LIMIT);
+
+    long partSize3 = AliyunOSSUtils.calculatePartSize(10000 * 100 * 1024,
+        100 * 1024);
+    assert(10000 * 100 * 1024 / partSize3
+        < Constants.MULTIPART_UPLOAD_PART_NUM_LIMIT);
+
+    long partSize4 = AliyunOSSUtils.calculatePartSize(10001 * 100 * 1024,
+        100 * 1024);
+    assert(10001 * 100 * 1024 / partSize4
+        < Constants.MULTIPART_UPLOAD_PART_NUM_LIMIT);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6542d17e/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSInputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSInputStream.java
 
b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSInputStream.java
index 66068c6..32d0e46 100644
--- 
a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSInputStream.java
+++ 
b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSInputStream.java
@@ -123,15 +123,15 @@ public class TestAliyunOSSInputStream {
         + fsDataInputStream.getPos(), fsDataInputStream.getPos() == 0);
 
     assertTrue("expected position at:"
-            + Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT + ", but got:"
-            + in.getExpectNextPos(),
+        + Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT + ", but got:"
+        + in.getExpectNextPos(),
         in.getExpectNextPos() == Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT);
     fsDataInputStream.seek(4 * 1024 * 1024);
     assertTrue("expected position at:" + 4 * 1024 * 1024
-            + Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT + ", but got:"
-            + in.getExpectNextPos(),
+        + Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT + ", but got:"
+        + in.getExpectNextPos(),
         in.getExpectNextPos() == 4 * 1024 * 1024
-            + Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT);
+        + Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT);
     IOUtils.closeStream(fsDataInputStream);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6542d17e/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSOutputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSOutputStream.java
 
b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSOutputStream.java
deleted file mode 100644
index 6b87d9c..0000000
--- 
a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSOutputStream.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.aliyun.oss;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.contract.ContractTestUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.Timeout;
-
-import java.io.IOException;
-
-/**
- * Tests regular and multi-part upload functionality for AliyunOSSOutputStream.
- */
-public class TestAliyunOSSOutputStream {
-  private FileSystem fs;
-  private static String testRootPath =
-      AliyunOSSTestUtils.generateUniqueTestPath();
-
-  @Rule
-  public Timeout testTimeout = new Timeout(30 * 60 * 1000);
-
-  @Before
-  public void setUp() throws Exception {
-    Configuration conf = new Configuration();
-    conf.setLong(Constants.MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, 5 * 1024 * 
1024);
-    conf.setInt(Constants.MULTIPART_UPLOAD_SIZE_KEY, 5 * 1024 * 1024);
-    fs = AliyunOSSTestUtils.createTestFileSystem(conf);
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    if (fs != null) {
-      fs.delete(new Path(testRootPath), true);
-    }
-  }
-
-  protected Path getTestPath() {
-    return new Path(testRootPath + "/test-aliyun-oss");
-  }
-
-  @Test
-  public void testRegularUpload() throws IOException {
-    ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 1024 * 1024);
-  }
-
-  @Test
-  public void testMultiPartUpload() throws IOException {
-    ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 6 * 1024 * 1024);
-  }
-
-  @Test
-  public void testMultiPartUploadLimit() throws IOException {
-    long partSize1 = AliyunOSSUtils.calculatePartSize(10 * 1024, 100 * 1024);
-    assert(10 * 1024 / partSize1 < Constants.MULTIPART_UPLOAD_PART_NUM_LIMIT);
-
-    long partSize2 = AliyunOSSUtils.calculatePartSize(200 * 1024, 100 * 1024);
-    assert(200 * 1024 / partSize2 < Constants.MULTIPART_UPLOAD_PART_NUM_LIMIT);
-
-    long partSize3 = AliyunOSSUtils.calculatePartSize(10000 * 100 * 1024,
-        100 * 1024);
-    assert(10000 * 100 * 1024 / partSize3
-        < Constants.MULTIPART_UPLOAD_PART_NUM_LIMIT);
-
-    long partSize4 = AliyunOSSUtils.calculatePartSize(10001 * 100 * 1024,
-        100 * 1024);
-    assert(10001 * 100 * 1024 / partSize4
-        < Constants.MULTIPART_UPLOAD_PART_NUM_LIMIT);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6542d17e/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractDistCp.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractDistCp.java
 
b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractDistCp.java
index 18d09d5..e9a98b3 100644
--- 
a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractDistCp.java
+++ 
b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractDistCp.java
@@ -33,7 +33,7 @@ public class TestAliyunOSSContractDistCp extends 
AbstractContractDistCpTest {
   protected Configuration createConfiguration() {
     Configuration newConf = super.createConfiguration();
     newConf.setLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, MULTIPART_SETTING);
-    newConf.setLong(MULTIPART_UPLOAD_SIZE_KEY, MULTIPART_SETTING);
+    newConf.setLong(MULTIPART_UPLOAD_PART_SIZE_KEY, MULTIPART_SETTING);
     return newConf;
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to