HADOOP-15027. AliyunOSS: Support multi-thread pre-read to improve sequential 
read from Hadoop to Aliyun OSS performance. (Contributed by Jinhu Wu)

(cherry picked from commit 9195a6e302028ed3921d1016ac2fa5754f06ebf0)
(cherry picked from commit 55142849db02a9191db0dd6f4e1401ff19ec242a)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/082a707b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/082a707b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/082a707b

Branch: refs/heads/branch-3
Commit: 082a707bae4bb97444a34c00eecd62975807388d
Parents: db8345f
Author: Sammi Chen <sammi.c...@intel.com>
Authored: Wed Jan 17 15:55:59 2018 +0800
Committer: Sammi Chen <sammi.c...@intel.com>
Committed: Wed Jan 17 16:16:03 2018 +0800

----------------------------------------------------------------------
 .../dev-support/findbugs-exclude.xml            |   8 +
 .../fs/aliyun/oss/AliyunOSSFileReaderTask.java  | 109 ++++++++++++++
 .../fs/aliyun/oss/AliyunOSSFileSystem.java      |  31 +++-
 .../fs/aliyun/oss/AliyunOSSInputStream.java     | 149 +++++++++++++------
 .../hadoop/fs/aliyun/oss/AliyunOSSUtils.java    |  12 ++
 .../apache/hadoop/fs/aliyun/oss/Constants.java  |  13 +-
 .../apache/hadoop/fs/aliyun/oss/ReadBuffer.java |  86 +++++++++++
 .../fs/aliyun/oss/TestAliyunOSSInputStream.java |  49 ++++++
 8 files changed, 407 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/082a707b/hadoop-tools/hadoop-aliyun/dev-support/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aliyun/dev-support/findbugs-exclude.xml 
b/hadoop-tools/hadoop-aliyun/dev-support/findbugs-exclude.xml
index 40d78d0..c55f8e3 100644
--- a/hadoop-tools/hadoop-aliyun/dev-support/findbugs-exclude.xml
+++ b/hadoop-tools/hadoop-aliyun/dev-support/findbugs-exclude.xml
@@ -15,4 +15,12 @@
    limitations under the License.
 -->
 <FindBugsFilter>
+    <!-- Disable FindBugs warning and return the buffer to caller directly.
+         It is convenient and efficient because we do not need to copy the 
buffer
+    -->
+    <Match>
+        <Class name="org.apache.hadoop.fs.aliyun.oss.ReadBuffer" />
+        <Method name="getBuffer" />
+        <Bug pattern="EI_EXPOSE_REP" />
+    </Match>
 </FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/082a707b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileReaderTask.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileReaderTask.java
 
b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileReaderTask.java
new file mode 100644
index 0000000..e5bfc2c
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileReaderTask.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.aliyun.oss;
+
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Used by {@link AliyunOSSInputStream} as an task that submitted
+ * to the thread pool.
+ * Each AliyunOSSFileReaderTask reads one part of the file so that
+ * we can accelerate the sequential read.
+ */
+public class AliyunOSSFileReaderTask implements Runnable {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(AliyunOSSFileReaderTask.class);
+
+  private String key;
+  private AliyunOSSFileSystemStore store;
+  private ReadBuffer readBuffer;
+  private static final int MAX_RETRIES = 3;
+  private RetryPolicy retryPolicy;
+
+  public AliyunOSSFileReaderTask(String key, AliyunOSSFileSystemStore store,
+      ReadBuffer readBuffer) {
+    this.key = key;
+    this.store = store;
+    this.readBuffer = readBuffer;
+    RetryPolicy defaultPolicy =
+        RetryPolicies.retryUpToMaximumCountWithFixedSleep(
+            MAX_RETRIES, 3, TimeUnit.SECONDS);
+    Map<Class<? extends Exception>, RetryPolicy> policies = new HashMap<>();
+    policies.put(IOException.class, defaultPolicy);
+    policies.put(IndexOutOfBoundsException.class,
+        RetryPolicies.TRY_ONCE_THEN_FAIL);
+    policies.put(NullPointerException.class,
+        RetryPolicies.TRY_ONCE_THEN_FAIL);
+
+    this.retryPolicy = RetryPolicies.retryByException(defaultPolicy, policies);
+  }
+
+  @Override
+  public void run() {
+    int retries = 0;
+    readBuffer.lock();
+    try {
+      while (true) {
+        try (InputStream in = store.retrieve(
+            key, readBuffer.getByteStart(), readBuffer.getByteEnd())) {
+          IOUtils.readFully(in, readBuffer.getBuffer(),
+              0, readBuffer.getBuffer().length);
+          readBuffer.setStatus(ReadBuffer.STATUS.SUCCESS);
+          break;
+        } catch (Exception e) {
+          LOG.warn("Exception thrown when retrieve key: "
+              + this.key + ", exception: " + e);
+          try {
+            RetryPolicy.RetryAction rc = retryPolicy.shouldRetry(
+                e, retries++, 0, true);
+            if (rc.action == RetryPolicy.RetryAction.RetryDecision.RETRY) {
+              Thread.sleep(rc.delayMillis);
+            } else {
+              //should not retry
+              break;
+            }
+          } catch (Exception ex) {
+            //FAIL
+            LOG.warn("Exception thrown when call shouldRetry, exception " + 
ex);
+            break;
+          }
+        }
+      }
+
+      if (readBuffer.getStatus() != ReadBuffer.STATUS.SUCCESS) {
+        readBuffer.setStatus(ReadBuffer.STATUS.ERROR);
+      }
+
+      //notify main thread which wait for this buffer
+      readBuffer.signalAll();
+    } finally {
+      readBuffer.unlock();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/082a707b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java
 
b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java
index 41d475d..afff223 100644
--- 
a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java
+++ 
b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java
@@ -24,7 +24,9 @@ import java.net.URI;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
+import com.google.common.util.concurrent.ListeningExecutorService;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -41,12 +43,14 @@ import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.PathIOException;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
 import org.apache.hadoop.util.Progressable;
 
 import com.aliyun.oss.model.OSSObjectSummary;
 import com.aliyun.oss.model.ObjectListing;
 import com.aliyun.oss.model.ObjectMetadata;
 
+import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,6 +69,9 @@ public class AliyunOSSFileSystem extends FileSystem {
   private Path workingDir;
   private AliyunOSSFileSystemStore store;
   private int maxKeys;
+  private int maxReadAheadPartNumber;
+  private ListeningExecutorService boundedThreadPool;
+
   private static final PathFilter DEFAULT_FILTER = new PathFilter() {
     @Override
     public boolean accept(Path file) {
@@ -82,6 +89,7 @@ public class AliyunOSSFileSystem extends FileSystem {
   public void close() throws IOException {
     try {
       store.close();
+      boundedThreadPool.shutdown();
     } finally {
       super.close();
     }
@@ -309,10 +317,24 @@ public class AliyunOSSFileSystem extends FileSystem {
     store = new AliyunOSSFileSystemStore();
     store.initialize(name, conf, statistics);
     maxKeys = conf.getInt(MAX_PAGING_KEYS_KEY, MAX_PAGING_KEYS_DEFAULT);
+
+    int threadNum = AliyunOSSUtils.intPositiveOption(conf,
+        Constants.MULTIPART_DOWNLOAD_THREAD_NUMBER_KEY,
+        Constants.MULTIPART_DOWNLOAD_THREAD_NUMBER_DEFAULT);
+
+    int totalTasks = AliyunOSSUtils.intPositiveOption(conf,
+        Constants.MAX_TOTAL_TASKS_KEY, Constants.MAX_TOTAL_TASKS_DEFAULT);
+
+    maxReadAheadPartNumber = AliyunOSSUtils.intPositiveOption(conf,
+        Constants.MULTIPART_DOWNLOAD_AHEAD_PART_MAX_NUM_KEY,
+        Constants.MULTIPART_DOWNLOAD_AHEAD_PART_MAX_NUM_DEFAULT);
+
+    this.boundedThreadPool = BlockingThreadPoolExecutorService.newInstance(
+        threadNum, totalTasks, 60L, TimeUnit.SECONDS, "oss-read-shared");
     setConf(conf);
   }
 
-  /**
+/**
    * Turn a path (relative or otherwise) into an OSS key.
    *
    * @param path the path of the file.
@@ -523,8 +545,11 @@ public class AliyunOSSFileSystem extends FileSystem {
           " because it is a directory");
     }
 
-    return new FSDataInputStream(new AliyunOSSInputStream(getConf(), store,
-        pathToKey(path), fileStatus.getLen(), statistics));
+    return new FSDataInputStream(new AliyunOSSInputStream(getConf(),
+        new SemaphoredDelegatingExecutor(
+            boundedThreadPool, maxReadAheadPartNumber, true),
+        maxReadAheadPartNumber, store, pathToKey(path), fileStatus.getLen(),
+        statistics));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/082a707b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java
 
b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java
index 72ba619..494ac53 100644
--- 
a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java
+++ 
b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java
@@ -20,8 +20,11 @@ package org.apache.hadoop.fs.aliyun.oss;
 
 import java.io.EOFException;
 import java.io.IOException;
-import java.io.InputStream;
+import java.util.ArrayDeque;
+import java.util.Queue;
+import java.util.concurrent.ExecutorService;
 
+import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -43,20 +46,33 @@ public class AliyunOSSInputStream extends FSInputStream {
   private final String key;
   private Statistics statistics;
   private boolean closed;
-  private InputStream wrappedStream = null;
   private long contentLength;
   private long position;
   private long partRemaining;
+  private byte[] buffer;
+  private int maxReadAheadPartNumber;
+  private long expectNextPos;
+  private long lastByteStart;
+
+  private ExecutorService readAheadExecutorService;
+  private Queue<ReadBuffer> readBufferQueue = new ArrayDeque<>();
 
   public AliyunOSSInputStream(Configuration conf,
+      ExecutorService readAheadExecutorService, int maxReadAheadPartNumber,
       AliyunOSSFileSystemStore store, String key, Long contentLength,
       Statistics statistics) throws IOException {
+    this.readAheadExecutorService =
+        MoreExecutors.listeningDecorator(readAheadExecutorService);
     this.store = store;
     this.key = key;
     this.statistics = statistics;
     this.contentLength = contentLength;
     downloadPartSize = conf.getLong(MULTIPART_DOWNLOAD_SIZE_KEY,
         MULTIPART_DOWNLOAD_SIZE_DEFAULT);
+    this.maxReadAheadPartNumber = maxReadAheadPartNumber;
+
+    this.expectNextPos = 0;
+    this.lastByteStart = -1;
     reopen(0);
     closed = false;
   }
@@ -82,15 +98,81 @@ public class AliyunOSSInputStream extends FSInputStream {
       partSize = downloadPartSize;
     }
 
-    if (wrappedStream != null) {
+    if (this.buffer != null) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Aborting old stream to open at pos " + pos);
       }
-      wrappedStream.close();
+      this.buffer = null;
+    }
+
+    boolean isRandomIO = true;
+    if (pos == this.expectNextPos) {
+      isRandomIO = false;
+    } else {
+      //new seek, remove cache buffers if its byteStart is not equal to pos
+      while (readBufferQueue.size() != 0) {
+        if (readBufferQueue.element().getByteStart() != pos) {
+          readBufferQueue.poll();
+        } else {
+          break;
+        }
+      }
+    }
+
+    this.expectNextPos = pos + partSize;
+
+    int currentSize = readBufferQueue.size();
+    if (currentSize == 0) {
+      //init lastByteStart to pos - partSize, used by for loop below
+      lastByteStart = pos - partSize;
+    } else {
+      ReadBuffer[] readBuffers = readBufferQueue.toArray(
+          new ReadBuffer[currentSize]);
+      lastByteStart = readBuffers[currentSize - 1].getByteStart();
     }
 
-    wrappedStream = store.retrieve(key, pos, pos + partSize -1);
-    if (wrappedStream == null) {
+    int maxLen = this.maxReadAheadPartNumber - currentSize;
+    for (int i = 0; i < maxLen && i < (currentSize + 1) * 2; i++) {
+      if (lastByteStart + partSize * (i + 1) > contentLength) {
+        break;
+      }
+
+      long byteStart = lastByteStart + partSize * (i + 1);
+      long byteEnd = byteStart + partSize -1;
+      if (byteEnd >= contentLength) {
+        byteEnd = contentLength - 1;
+      }
+
+      ReadBuffer readBuffer = new ReadBuffer(byteStart, byteEnd);
+      if (readBuffer.getBuffer().length == 0) {
+        //EOF
+        readBuffer.setStatus(ReadBuffer.STATUS.SUCCESS);
+      } else {
+        this.readAheadExecutorService.execute(
+            new AliyunOSSFileReaderTask(key, store, readBuffer));
+      }
+      readBufferQueue.add(readBuffer);
+      if (isRandomIO) {
+        break;
+      }
+    }
+
+    ReadBuffer readBuffer = readBufferQueue.poll();
+    readBuffer.lock();
+    try {
+      readBuffer.await(ReadBuffer.STATUS.INIT);
+      if (readBuffer.getStatus() == ReadBuffer.STATUS.ERROR) {
+        this.buffer = null;
+      } else {
+        this.buffer = readBuffer.getBuffer();
+      }
+    } catch (InterruptedException e) {
+      LOG.warn("interrupted when wait a read buffer");
+    } finally {
+      readBuffer.unlock();
+    }
+
+    if (this.buffer == null) {
       throw new IOException("Null IO stream");
     }
     position = pos;
@@ -105,18 +187,10 @@ public class AliyunOSSInputStream extends FSInputStream {
       reopen(position);
     }
 
-    int tries = MAX_RETRIES;
-    boolean retry;
     int byteRead = -1;
-    do {
-      retry = false;
-      try {
-        byteRead = wrappedStream.read();
-      } catch (Exception e) {
-        handleReadException(e, --tries);
-        retry = true;
-      }
-    } while (retry);
+    if (partRemaining != 0) {
+      byteRead = this.buffer[this.buffer.length - (int)partRemaining] & 0xFF;
+    }
     if (byteRead >= 0) {
       position++;
       partRemaining--;
@@ -161,21 +235,18 @@ public class AliyunOSSInputStream extends FSInputStream {
         reopen(position);
       }
 
-      int tries = MAX_RETRIES;
-      boolean retry;
-      int bytes = -1;
-      do {
-        retry = false;
-        try {
-          bytes = wrappedStream.read(buf, off + bytesRead, len - bytesRead);
-        } catch (Exception e) {
-          handleReadException(e, --tries);
-          retry = true;
+      int bytes = 0;
+      for (int i = this.buffer.length - (int)partRemaining;
+           i < this.buffer.length; i++) {
+        buf[off + bytesRead] = this.buffer[i];
+        bytes++;
+        bytesRead++;
+        if (off + bytesRead >= len) {
+          break;
         }
-      } while (retry);
+      }
 
       if (bytes > 0) {
-        bytesRead += bytes;
         position += bytes;
         partRemaining -= bytes;
       } else if (partRemaining != 0) {
@@ -202,9 +273,7 @@ public class AliyunOSSInputStream extends FSInputStream {
       return;
     }
     closed = true;
-    if (wrappedStream != null) {
-      wrappedStream.close();
-    }
+    this.buffer = null;
   }
 
   @Override
@@ -225,7 +294,6 @@ public class AliyunOSSInputStream extends FSInputStream {
       return;
     } else if (pos > position && pos < position + partRemaining) {
       long len = pos - position;
-      AliyunOSSUtils.skipFully(wrappedStream, len);
       position = pos;
       partRemaining -= len;
     } else {
@@ -245,18 +313,7 @@ public class AliyunOSSInputStream extends FSInputStream {
     return false;
   }
 
-  private void handleReadException(Exception e, int tries) throws IOException{
-    if (tries == 0) {
-      throw new IOException(e);
-    }
-
-    LOG.warn("Some exceptions occurred in oss connection, try to reopen oss" +
-        " connection at position '" + position + "', " + e.getMessage());
-    try {
-      Thread.sleep(100);
-    } catch (InterruptedException e2) {
-      LOG.warn(e2.getMessage());
-    }
-    reopen(position);
+  public long getExpectNextPos() {
+    return this.expectNextPos;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/082a707b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java
 
b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java
index fdf72e4..1a21608 100644
--- 
a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java
+++ 
b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java
@@ -40,6 +40,18 @@ final public class AliyunOSSUtils {
   private AliyunOSSUtils() {
   }
 
+  public static int intPositiveOption(
+      Configuration conf, String key, int defVal) {
+    int v = conf.getInt(key, defVal);
+    if (v <= 0) {
+      LOG.warn(key + " is configured to " + v
+          + ", will use default value: " + defVal);
+      v = defVal;
+    }
+
+    return v;
+  }
+
   /**
    * Used to get password from configuration.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/082a707b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java
 
b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java
index dd71842..410adc9 100644
--- 
a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java
+++ 
b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java
@@ -97,7 +97,18 @@ public final class Constants {
   public static final String MULTIPART_DOWNLOAD_SIZE_KEY =
       "fs.oss.multipart.download.size";
 
-  public static final long MULTIPART_DOWNLOAD_SIZE_DEFAULT = 100 * 1024;
+  public static final long MULTIPART_DOWNLOAD_SIZE_DEFAULT = 512 * 1024;
+
+  public static final String MULTIPART_DOWNLOAD_THREAD_NUMBER_KEY =
+      "fs.oss.multipart.download.threads";
+  public static final int MULTIPART_DOWNLOAD_THREAD_NUMBER_DEFAULT = 10;
+
+  public static final String MAX_TOTAL_TASKS_KEY = "fs.oss.max.total.tasks";
+  public static final int MAX_TOTAL_TASKS_DEFAULT = 128;
+
+  public static final String MULTIPART_DOWNLOAD_AHEAD_PART_MAX_NUM_KEY =
+      "fs.oss.multipart.download.ahead.part.max.number";
+  public static final int MULTIPART_DOWNLOAD_AHEAD_PART_MAX_NUM_DEFAULT = 4;
 
   // Comma separated list of directories
   public static final String BUFFER_DIR_KEY = "fs.oss.buffer.dir";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/082a707b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/ReadBuffer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/ReadBuffer.java
 
b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/ReadBuffer.java
new file mode 100644
index 0000000..46bb5bf
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/ReadBuffer.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.aliyun.oss;
+
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * This class is used by {@link AliyunOSSInputStream}
+ * and {@link AliyunOSSFileReaderTask} to buffer data that read from oss.
+ */
+public class ReadBuffer {
+  enum STATUS {
+    INIT, SUCCESS, ERROR
+  }
+  private final ReentrantLock lock = new ReentrantLock();
+
+  private Condition readyCondition = lock.newCondition();
+
+  private byte[] buffer;
+  private STATUS status;
+  private long byteStart;
+  private long byteEnd;
+
+  public ReadBuffer(long byteStart, long byteEnd) {
+    this.buffer = new byte[(int)(byteEnd - byteStart) + 1];
+
+    this.status = STATUS.INIT;
+    this.byteStart = byteStart;
+    this.byteEnd = byteEnd;
+  }
+
+  public void lock() {
+    lock.lock();
+  }
+
+  public void unlock() {
+    lock.unlock();
+  }
+
+  public void await(STATUS waitStatus) throws InterruptedException {
+    while (this.status == waitStatus) {
+      readyCondition.await();
+    }
+  }
+
+  public void signalAll() {
+    readyCondition.signalAll();
+  }
+
+  public byte[] getBuffer() {
+    return buffer;
+  }
+
+  public STATUS getStatus() {
+    return status;
+  }
+
+  public void setStatus(STATUS status) {
+    this.status = status;
+  }
+
+  public long getByteStart() {
+    return byteStart;
+  }
+
+  public long getByteEnd() {
+    return byteEnd;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/082a707b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSInputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSInputStream.java
 
b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSInputStream.java
index 10c4edd..66068c6 100644
--- 
a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSInputStream.java
+++ 
b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSInputStream.java
@@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Random;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -108,6 +109,54 @@ public class TestAliyunOSSInputStream {
   }
 
   @Test
+  public void testSequentialAndRandomRead() throws Exception {
+    Path smallSeekFile = setPath("/test/smallSeekFile.txt");
+    long size = 5 * 1024 * 1024;
+
+    ContractTestUtils.generateTestFile(this.fs, smallSeekFile, size, 256, 255);
+    LOG.info("5MB file created: smallSeekFile.txt");
+
+    FSDataInputStream fsDataInputStream = this.fs.open(smallSeekFile);
+    AliyunOSSInputStream in =
+        (AliyunOSSInputStream)fsDataInputStream.getWrappedStream();
+    assertTrue("expected position at:" + 0 + ", but got:"
+        + fsDataInputStream.getPos(), fsDataInputStream.getPos() == 0);
+
+    assertTrue("expected position at:"
+            + Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT + ", but got:"
+            + in.getExpectNextPos(),
+        in.getExpectNextPos() == Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT);
+    fsDataInputStream.seek(4 * 1024 * 1024);
+    assertTrue("expected position at:" + 4 * 1024 * 1024
+            + Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT + ", but got:"
+            + in.getExpectNextPos(),
+        in.getExpectNextPos() == 4 * 1024 * 1024
+            + Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT);
+    IOUtils.closeStream(fsDataInputStream);
+  }
+
+  @Test
+  public void testOSSFileReaderTask() throws Exception {
+    Path smallSeekFile = setPath("/test/smallSeekFileOSSFileReader.txt");
+    long size = 5 * 1024 * 1024;
+
+    ContractTestUtils.generateTestFile(this.fs, smallSeekFile, size, 256, 255);
+    LOG.info("5MB file created: smallSeekFileOSSFileReader.txt");
+    ReadBuffer readBuffer = new ReadBuffer(12, 24);
+    AliyunOSSFileReaderTask task = new AliyunOSSFileReaderTask("1",
+        ((AliyunOSSFileSystem)this.fs).getStore(), readBuffer);
+    //NullPointerException, fail
+    task.run();
+    assertEquals(readBuffer.getStatus(), ReadBuffer.STATUS.ERROR);
+    //OK
+    task = new AliyunOSSFileReaderTask(
+        "test/test/smallSeekFileOSSFileReader.txt",
+        ((AliyunOSSFileSystem)this.fs).getStore(), readBuffer);
+    task.run();
+    assertEquals(readBuffer.getStatus(), ReadBuffer.STATUS.SUCCESS);
+  }
+
+  @Test
   public void testReadFile() throws Exception {
     final int bufLen = 256;
     final int sizeFlag = 5;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to