Repository: hadoop
Updated Branches:
  refs/heads/branch-3.0 55142849d -> b1cc14e9c


Revert "HADOOP-15027. AliyunOSS: Support multi-thread pre-read to improve 
sequential read from Hadoop to Aliyun OSS performance. (Contributed by Jinhu 
Wu)"

This reverts commit 55142849db02a9191db0dd6f4e1401ff19ec242a.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b1cc14e9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b1cc14e9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b1cc14e9

Branch: refs/heads/branch-3.0
Commit: b1cc14e9cbfacdab61adb4f35cc66825e0ba2291
Parents: 5514284
Author: Jason Lowe <jl...@apache.org>
Authored: Wed Jan 17 09:53:24 2018 -0600
Committer: Jason Lowe <jl...@apache.org>
Committed: Wed Jan 17 09:53:24 2018 -0600

----------------------------------------------------------------------
 .../dev-support/findbugs-exclude.xml            |   8 -
 .../fs/aliyun/oss/AliyunOSSFileReaderTask.java  | 109 --------------
 .../fs/aliyun/oss/AliyunOSSFileSystem.java      |  31 +---
 .../fs/aliyun/oss/AliyunOSSInputStream.java     | 149 ++++++-------------
 .../hadoop/fs/aliyun/oss/AliyunOSSUtils.java    |  12 --
 .../apache/hadoop/fs/aliyun/oss/Constants.java  |  13 +-
 .../apache/hadoop/fs/aliyun/oss/ReadBuffer.java |  86 -----------
 .../fs/aliyun/oss/TestAliyunOSSInputStream.java |  49 ------
 8 files changed, 50 insertions(+), 407 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b1cc14e9/hadoop-tools/hadoop-aliyun/dev-support/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aliyun/dev-support/findbugs-exclude.xml 
b/hadoop-tools/hadoop-aliyun/dev-support/findbugs-exclude.xml
index c55f8e3..40d78d0 100644
--- a/hadoop-tools/hadoop-aliyun/dev-support/findbugs-exclude.xml
+++ b/hadoop-tools/hadoop-aliyun/dev-support/findbugs-exclude.xml
@@ -15,12 +15,4 @@
    limitations under the License.
 -->
 <FindBugsFilter>
-    <!-- Disable FindBugs warning and return the buffer to caller directly.
-         It is convenient and efficient because we do not need to copy the 
buffer
-    -->
-    <Match>
-        <Class name="org.apache.hadoop.fs.aliyun.oss.ReadBuffer" />
-        <Method name="getBuffer" />
-        <Bug pattern="EI_EXPOSE_REP" />
-    </Match>
 </FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b1cc14e9/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileReaderTask.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileReaderTask.java
 
b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileReaderTask.java
deleted file mode 100644
index e5bfc2c..0000000
--- 
a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileReaderTask.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.aliyun.oss;
-
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.retry.RetryPolicies;
-import org.apache.hadoop.io.retry.RetryPolicy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Used by {@link AliyunOSSInputStream} as an task that submitted
- * to the thread pool.
- * Each AliyunOSSFileReaderTask reads one part of the file so that
- * we can accelerate the sequential read.
- */
-public class AliyunOSSFileReaderTask implements Runnable {
-  public static final Logger LOG =
-      LoggerFactory.getLogger(AliyunOSSFileReaderTask.class);
-
-  private String key;
-  private AliyunOSSFileSystemStore store;
-  private ReadBuffer readBuffer;
-  private static final int MAX_RETRIES = 3;
-  private RetryPolicy retryPolicy;
-
-  public AliyunOSSFileReaderTask(String key, AliyunOSSFileSystemStore store,
-      ReadBuffer readBuffer) {
-    this.key = key;
-    this.store = store;
-    this.readBuffer = readBuffer;
-    RetryPolicy defaultPolicy =
-        RetryPolicies.retryUpToMaximumCountWithFixedSleep(
-            MAX_RETRIES, 3, TimeUnit.SECONDS);
-    Map<Class<? extends Exception>, RetryPolicy> policies = new HashMap<>();
-    policies.put(IOException.class, defaultPolicy);
-    policies.put(IndexOutOfBoundsException.class,
-        RetryPolicies.TRY_ONCE_THEN_FAIL);
-    policies.put(NullPointerException.class,
-        RetryPolicies.TRY_ONCE_THEN_FAIL);
-
-    this.retryPolicy = RetryPolicies.retryByException(defaultPolicy, policies);
-  }
-
-  @Override
-  public void run() {
-    int retries = 0;
-    readBuffer.lock();
-    try {
-      while (true) {
-        try (InputStream in = store.retrieve(
-            key, readBuffer.getByteStart(), readBuffer.getByteEnd())) {
-          IOUtils.readFully(in, readBuffer.getBuffer(),
-              0, readBuffer.getBuffer().length);
-          readBuffer.setStatus(ReadBuffer.STATUS.SUCCESS);
-          break;
-        } catch (Exception e) {
-          LOG.warn("Exception thrown when retrieve key: "
-              + this.key + ", exception: " + e);
-          try {
-            RetryPolicy.RetryAction rc = retryPolicy.shouldRetry(
-                e, retries++, 0, true);
-            if (rc.action == RetryPolicy.RetryAction.RetryDecision.RETRY) {
-              Thread.sleep(rc.delayMillis);
-            } else {
-              //should not retry
-              break;
-            }
-          } catch (Exception ex) {
-            //FAIL
-            LOG.warn("Exception thrown when call shouldRetry, exception " + 
ex);
-            break;
-          }
-        }
-      }
-
-      if (readBuffer.getStatus() != ReadBuffer.STATUS.SUCCESS) {
-        readBuffer.setStatus(ReadBuffer.STATUS.ERROR);
-      }
-
-      //notify main thread which wait for this buffer
-      readBuffer.signalAll();
-    } finally {
-      readBuffer.unlock();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b1cc14e9/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java
 
b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java
index afff223..41d475d 100644
--- 
a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java
+++ 
b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java
@@ -24,9 +24,7 @@ import java.net.URI;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
 
-import com.google.common.util.concurrent.ListeningExecutorService;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -43,14 +41,12 @@ import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.PathIOException;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
 import org.apache.hadoop.util.Progressable;
 
 import com.aliyun.oss.model.OSSObjectSummary;
 import com.aliyun.oss.model.ObjectListing;
 import com.aliyun.oss.model.ObjectMetadata;
 
-import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -69,9 +65,6 @@ public class AliyunOSSFileSystem extends FileSystem {
   private Path workingDir;
   private AliyunOSSFileSystemStore store;
   private int maxKeys;
-  private int maxReadAheadPartNumber;
-  private ListeningExecutorService boundedThreadPool;
-
   private static final PathFilter DEFAULT_FILTER = new PathFilter() {
     @Override
     public boolean accept(Path file) {
@@ -89,7 +82,6 @@ public class AliyunOSSFileSystem extends FileSystem {
   public void close() throws IOException {
     try {
       store.close();
-      boundedThreadPool.shutdown();
     } finally {
       super.close();
     }
@@ -317,24 +309,10 @@ public class AliyunOSSFileSystem extends FileSystem {
     store = new AliyunOSSFileSystemStore();
     store.initialize(name, conf, statistics);
     maxKeys = conf.getInt(MAX_PAGING_KEYS_KEY, MAX_PAGING_KEYS_DEFAULT);
-
-    int threadNum = AliyunOSSUtils.intPositiveOption(conf,
-        Constants.MULTIPART_DOWNLOAD_THREAD_NUMBER_KEY,
-        Constants.MULTIPART_DOWNLOAD_THREAD_NUMBER_DEFAULT);
-
-    int totalTasks = AliyunOSSUtils.intPositiveOption(conf,
-        Constants.MAX_TOTAL_TASKS_KEY, Constants.MAX_TOTAL_TASKS_DEFAULT);
-
-    maxReadAheadPartNumber = AliyunOSSUtils.intPositiveOption(conf,
-        Constants.MULTIPART_DOWNLOAD_AHEAD_PART_MAX_NUM_KEY,
-        Constants.MULTIPART_DOWNLOAD_AHEAD_PART_MAX_NUM_DEFAULT);
-
-    this.boundedThreadPool = BlockingThreadPoolExecutorService.newInstance(
-        threadNum, totalTasks, 60L, TimeUnit.SECONDS, "oss-read-shared");
     setConf(conf);
   }
 
-/**
+  /**
    * Turn a path (relative or otherwise) into an OSS key.
    *
    * @param path the path of the file.
@@ -545,11 +523,8 @@ public class AliyunOSSFileSystem extends FileSystem {
           " because it is a directory");
     }
 
-    return new FSDataInputStream(new AliyunOSSInputStream(getConf(),
-        new SemaphoredDelegatingExecutor(
-            boundedThreadPool, maxReadAheadPartNumber, true),
-        maxReadAheadPartNumber, store, pathToKey(path), fileStatus.getLen(),
-        statistics));
+    return new FSDataInputStream(new AliyunOSSInputStream(getConf(), store,
+        pathToKey(path), fileStatus.getLen(), statistics));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b1cc14e9/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java
 
b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java
index 494ac53..72ba619 100644
--- 
a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java
+++ 
b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java
@@ -20,11 +20,8 @@ package org.apache.hadoop.fs.aliyun.oss;
 
 import java.io.EOFException;
 import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.Queue;
-import java.util.concurrent.ExecutorService;
+import java.io.InputStream;
 
-import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -46,33 +43,20 @@ public class AliyunOSSInputStream extends FSInputStream {
   private final String key;
   private Statistics statistics;
   private boolean closed;
+  private InputStream wrappedStream = null;
   private long contentLength;
   private long position;
   private long partRemaining;
-  private byte[] buffer;
-  private int maxReadAheadPartNumber;
-  private long expectNextPos;
-  private long lastByteStart;
-
-  private ExecutorService readAheadExecutorService;
-  private Queue<ReadBuffer> readBufferQueue = new ArrayDeque<>();
 
   public AliyunOSSInputStream(Configuration conf,
-      ExecutorService readAheadExecutorService, int maxReadAheadPartNumber,
       AliyunOSSFileSystemStore store, String key, Long contentLength,
       Statistics statistics) throws IOException {
-    this.readAheadExecutorService =
-        MoreExecutors.listeningDecorator(readAheadExecutorService);
     this.store = store;
     this.key = key;
     this.statistics = statistics;
     this.contentLength = contentLength;
     downloadPartSize = conf.getLong(MULTIPART_DOWNLOAD_SIZE_KEY,
         MULTIPART_DOWNLOAD_SIZE_DEFAULT);
-    this.maxReadAheadPartNumber = maxReadAheadPartNumber;
-
-    this.expectNextPos = 0;
-    this.lastByteStart = -1;
     reopen(0);
     closed = false;
   }
@@ -98,81 +82,15 @@ public class AliyunOSSInputStream extends FSInputStream {
       partSize = downloadPartSize;
     }
 
-    if (this.buffer != null) {
+    if (wrappedStream != null) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Aborting old stream to open at pos " + pos);
       }
-      this.buffer = null;
-    }
-
-    boolean isRandomIO = true;
-    if (pos == this.expectNextPos) {
-      isRandomIO = false;
-    } else {
-      //new seek, remove cache buffers if its byteStart is not equal to pos
-      while (readBufferQueue.size() != 0) {
-        if (readBufferQueue.element().getByteStart() != pos) {
-          readBufferQueue.poll();
-        } else {
-          break;
-        }
-      }
-    }
-
-    this.expectNextPos = pos + partSize;
-
-    int currentSize = readBufferQueue.size();
-    if (currentSize == 0) {
-      //init lastByteStart to pos - partSize, used by for loop below
-      lastByteStart = pos - partSize;
-    } else {
-      ReadBuffer[] readBuffers = readBufferQueue.toArray(
-          new ReadBuffer[currentSize]);
-      lastByteStart = readBuffers[currentSize - 1].getByteStart();
+      wrappedStream.close();
     }
 
-    int maxLen = this.maxReadAheadPartNumber - currentSize;
-    for (int i = 0; i < maxLen && i < (currentSize + 1) * 2; i++) {
-      if (lastByteStart + partSize * (i + 1) > contentLength) {
-        break;
-      }
-
-      long byteStart = lastByteStart + partSize * (i + 1);
-      long byteEnd = byteStart + partSize -1;
-      if (byteEnd >= contentLength) {
-        byteEnd = contentLength - 1;
-      }
-
-      ReadBuffer readBuffer = new ReadBuffer(byteStart, byteEnd);
-      if (readBuffer.getBuffer().length == 0) {
-        //EOF
-        readBuffer.setStatus(ReadBuffer.STATUS.SUCCESS);
-      } else {
-        this.readAheadExecutorService.execute(
-            new AliyunOSSFileReaderTask(key, store, readBuffer));
-      }
-      readBufferQueue.add(readBuffer);
-      if (isRandomIO) {
-        break;
-      }
-    }
-
-    ReadBuffer readBuffer = readBufferQueue.poll();
-    readBuffer.lock();
-    try {
-      readBuffer.await(ReadBuffer.STATUS.INIT);
-      if (readBuffer.getStatus() == ReadBuffer.STATUS.ERROR) {
-        this.buffer = null;
-      } else {
-        this.buffer = readBuffer.getBuffer();
-      }
-    } catch (InterruptedException e) {
-      LOG.warn("interrupted when wait a read buffer");
-    } finally {
-      readBuffer.unlock();
-    }
-
-    if (this.buffer == null) {
+    wrappedStream = store.retrieve(key, pos, pos + partSize -1);
+    if (wrappedStream == null) {
       throw new IOException("Null IO stream");
     }
     position = pos;
@@ -187,10 +105,18 @@ public class AliyunOSSInputStream extends FSInputStream {
       reopen(position);
     }
 
+    int tries = MAX_RETRIES;
+    boolean retry;
     int byteRead = -1;
-    if (partRemaining != 0) {
-      byteRead = this.buffer[this.buffer.length - (int)partRemaining] & 0xFF;
-    }
+    do {
+      retry = false;
+      try {
+        byteRead = wrappedStream.read();
+      } catch (Exception e) {
+        handleReadException(e, --tries);
+        retry = true;
+      }
+    } while (retry);
     if (byteRead >= 0) {
       position++;
       partRemaining--;
@@ -235,18 +161,21 @@ public class AliyunOSSInputStream extends FSInputStream {
         reopen(position);
       }
 
-      int bytes = 0;
-      for (int i = this.buffer.length - (int)partRemaining;
-           i < this.buffer.length; i++) {
-        buf[off + bytesRead] = this.buffer[i];
-        bytes++;
-        bytesRead++;
-        if (off + bytesRead >= len) {
-          break;
+      int tries = MAX_RETRIES;
+      boolean retry;
+      int bytes = -1;
+      do {
+        retry = false;
+        try {
+          bytes = wrappedStream.read(buf, off + bytesRead, len - bytesRead);
+        } catch (Exception e) {
+          handleReadException(e, --tries);
+          retry = true;
         }
-      }
+      } while (retry);
 
       if (bytes > 0) {
+        bytesRead += bytes;
         position += bytes;
         partRemaining -= bytes;
       } else if (partRemaining != 0) {
@@ -273,7 +202,9 @@ public class AliyunOSSInputStream extends FSInputStream {
       return;
     }
     closed = true;
-    this.buffer = null;
+    if (wrappedStream != null) {
+      wrappedStream.close();
+    }
   }
 
   @Override
@@ -294,6 +225,7 @@ public class AliyunOSSInputStream extends FSInputStream {
       return;
     } else if (pos > position && pos < position + partRemaining) {
       long len = pos - position;
+      AliyunOSSUtils.skipFully(wrappedStream, len);
       position = pos;
       partRemaining -= len;
     } else {
@@ -313,7 +245,18 @@ public class AliyunOSSInputStream extends FSInputStream {
     return false;
   }
 
-  public long getExpectNextPos() {
-    return this.expectNextPos;
+  private void handleReadException(Exception e, int tries) throws IOException{
+    if (tries == 0) {
+      throw new IOException(e);
+    }
+
+    LOG.warn("Some exceptions occurred in oss connection, try to reopen oss" +
+        " connection at position '" + position + "', " + e.getMessage());
+    try {
+      Thread.sleep(100);
+    } catch (InterruptedException e2) {
+      LOG.warn(e2.getMessage());
+    }
+    reopen(position);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b1cc14e9/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java
 
b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java
index 1a21608..fdf72e4 100644
--- 
a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java
+++ 
b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java
@@ -40,18 +40,6 @@ final public class AliyunOSSUtils {
   private AliyunOSSUtils() {
   }
 
-  public static int intPositiveOption(
-      Configuration conf, String key, int defVal) {
-    int v = conf.getInt(key, defVal);
-    if (v <= 0) {
-      LOG.warn(key + " is configured to " + v
-          + ", will use default value: " + defVal);
-      v = defVal;
-    }
-
-    return v;
-  }
-
   /**
    * Used to get password from configuration.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b1cc14e9/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java
 
b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java
index 410adc9..dd71842 100644
--- 
a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java
+++ 
b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java
@@ -97,18 +97,7 @@ public final class Constants {
   public static final String MULTIPART_DOWNLOAD_SIZE_KEY =
       "fs.oss.multipart.download.size";
 
-  public static final long MULTIPART_DOWNLOAD_SIZE_DEFAULT = 512 * 1024;
-
-  public static final String MULTIPART_DOWNLOAD_THREAD_NUMBER_KEY =
-      "fs.oss.multipart.download.threads";
-  public static final int MULTIPART_DOWNLOAD_THREAD_NUMBER_DEFAULT = 10;
-
-  public static final String MAX_TOTAL_TASKS_KEY = "fs.oss.max.total.tasks";
-  public static final int MAX_TOTAL_TASKS_DEFAULT = 128;
-
-  public static final String MULTIPART_DOWNLOAD_AHEAD_PART_MAX_NUM_KEY =
-      "fs.oss.multipart.download.ahead.part.max.number";
-  public static final int MULTIPART_DOWNLOAD_AHEAD_PART_MAX_NUM_DEFAULT = 4;
+  public static final long MULTIPART_DOWNLOAD_SIZE_DEFAULT = 100 * 1024;
 
   // Comma separated list of directories
   public static final String BUFFER_DIR_KEY = "fs.oss.buffer.dir";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b1cc14e9/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/ReadBuffer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/ReadBuffer.java
 
b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/ReadBuffer.java
deleted file mode 100644
index 46bb5bf..0000000
--- 
a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/ReadBuffer.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.aliyun.oss;
-
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-
-/**
- * This class is used by {@link AliyunOSSInputStream}
- * and {@link AliyunOSSFileReaderTask} to buffer data that read from oss.
- */
-public class ReadBuffer {
-  enum STATUS {
-    INIT, SUCCESS, ERROR
-  }
-  private final ReentrantLock lock = new ReentrantLock();
-
-  private Condition readyCondition = lock.newCondition();
-
-  private byte[] buffer;
-  private STATUS status;
-  private long byteStart;
-  private long byteEnd;
-
-  public ReadBuffer(long byteStart, long byteEnd) {
-    this.buffer = new byte[(int)(byteEnd - byteStart) + 1];
-
-    this.status = STATUS.INIT;
-    this.byteStart = byteStart;
-    this.byteEnd = byteEnd;
-  }
-
-  public void lock() {
-    lock.lock();
-  }
-
-  public void unlock() {
-    lock.unlock();
-  }
-
-  public void await(STATUS waitStatus) throws InterruptedException {
-    while (this.status == waitStatus) {
-      readyCondition.await();
-    }
-  }
-
-  public void signalAll() {
-    readyCondition.signalAll();
-  }
-
-  public byte[] getBuffer() {
-    return buffer;
-  }
-
-  public STATUS getStatus() {
-    return status;
-  }
-
-  public void setStatus(STATUS status) {
-    this.status = status;
-  }
-
-  public long getByteStart() {
-    return byteStart;
-  }
-
-  public long getByteEnd() {
-    return byteEnd;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b1cc14e9/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSInputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSInputStream.java
 
b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSInputStream.java
index 66068c6..10c4edd 100644
--- 
a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSInputStream.java
+++ 
b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSInputStream.java
@@ -35,7 +35,6 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Random;
 
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -109,54 +108,6 @@ public class TestAliyunOSSInputStream {
   }
 
   @Test
-  public void testSequentialAndRandomRead() throws Exception {
-    Path smallSeekFile = setPath("/test/smallSeekFile.txt");
-    long size = 5 * 1024 * 1024;
-
-    ContractTestUtils.generateTestFile(this.fs, smallSeekFile, size, 256, 255);
-    LOG.info("5MB file created: smallSeekFile.txt");
-
-    FSDataInputStream fsDataInputStream = this.fs.open(smallSeekFile);
-    AliyunOSSInputStream in =
-        (AliyunOSSInputStream)fsDataInputStream.getWrappedStream();
-    assertTrue("expected position at:" + 0 + ", but got:"
-        + fsDataInputStream.getPos(), fsDataInputStream.getPos() == 0);
-
-    assertTrue("expected position at:"
-            + Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT + ", but got:"
-            + in.getExpectNextPos(),
-        in.getExpectNextPos() == Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT);
-    fsDataInputStream.seek(4 * 1024 * 1024);
-    assertTrue("expected position at:" + 4 * 1024 * 1024
-            + Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT + ", but got:"
-            + in.getExpectNextPos(),
-        in.getExpectNextPos() == 4 * 1024 * 1024
-            + Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT);
-    IOUtils.closeStream(fsDataInputStream);
-  }
-
-  @Test
-  public void testOSSFileReaderTask() throws Exception {
-    Path smallSeekFile = setPath("/test/smallSeekFileOSSFileReader.txt");
-    long size = 5 * 1024 * 1024;
-
-    ContractTestUtils.generateTestFile(this.fs, smallSeekFile, size, 256, 255);
-    LOG.info("5MB file created: smallSeekFileOSSFileReader.txt");
-    ReadBuffer readBuffer = new ReadBuffer(12, 24);
-    AliyunOSSFileReaderTask task = new AliyunOSSFileReaderTask("1",
-        ((AliyunOSSFileSystem)this.fs).getStore(), readBuffer);
-    //NullPointerException, fail
-    task.run();
-    assertEquals(readBuffer.getStatus(), ReadBuffer.STATUS.ERROR);
-    //OK
-    task = new AliyunOSSFileReaderTask(
-        "test/test/smallSeekFileOSSFileReader.txt",
-        ((AliyunOSSFileSystem)this.fs).getStore(), readBuffer);
-    task.run();
-    assertEquals(readBuffer.getStatus(), ReadBuffer.STATUS.SUCCESS);
-  }
-
-  @Test
   public void testReadFile() throws Exception {
     final int bufLen = 256;
     final int sizeFlag = 5;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to