steveloughran commented on a change in pull request #3736:
URL: https://github.com/apache/hadoop/pull/3736#discussion_r761936264



##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BlockCache.java
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.common;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * Provides functionality necessary for caching blocks of data read from 
FileSystem.
+ */
+public interface BlockCache extends Closeable {
+
+  /**
+   * Indicates whether the given block is in this cache.
+   *
+   * @param blockNumber the id of the given block.
+   * @return true if the given block is in this cache, false otherwise.
+   */
+  boolean containsBlock(Integer blockNumber);

Review comment:
       why not int?

##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BlockData.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.common;
+
+/**
+ * Holds information about blocks of data in an S3 file.
+ */
+public class BlockData {
+  // State of each block of data.
+  enum State {
+    // Data is not yet ready to be read.
+    NOT_READY,
+
+    // A read of this block has been queued.
+    QUEUED,
+
+    // This block is ready to be read.
+    READY,
+
+    // This block has been cached.
+    CACHED
+  }
+
+  // State of all blocks in an S3 file.
+  private State[] state;
+
+  // The size of an S3 file.
+  private final long fileSize;
+
+  // The S3 file is divided into blocks of this size.
+  private final int blockSize;
+
+  // The S3 file has these many blocks.
+  private final int numBlocks;
+
+  /**
+   * Constructs an instance of {@link BlockData}.
+   *
+   * @param fileSize the size of an S3 file.
+   * @param blockSize the S3 file is divided into blocks of this size.
+   */
+  public BlockData(long fileSize, int blockSize) {
+    Validate.checkNotNegative(fileSize, "fileSize");
+    if (fileSize == 0) {
+      Validate.checkNotNegative(blockSize, "blockSize");
+    } else {
+      Validate.checkPositiveInteger(blockSize, "blockSize");
+    }
+
+    this.fileSize = fileSize;
+    this.blockSize = blockSize;
+    this.numBlocks =
+        (fileSize == 0) ? 0 : ((int) (fileSize / blockSize)) + (fileSize % 
blockSize > 0 ? 1 : 0);
+    this.state = new State[this.numBlocks];
+    for (int b = 0; b < this.numBlocks; b++) {
+      this.setState(b, State.NOT_READY);
+    }
+  }
+
+  public int getBlockSize() {
+    return this.blockSize;
+  }
+
+  public long getFileSize() {
+    return this.fileSize;
+  }
+
+  public int getNumBlocks() {
+    return this.numBlocks;
+  }
+
+  public boolean isLastBlock(int blockNumber) {
+    if (this.fileSize == 0) {
+      return false;
+    }
+
+    throwIfInvalidBlockNumber(blockNumber);
+
+    return blockNumber == (this.numBlocks - 1);
+  }
+
+  public int getBlockNumber(long offset) {
+    throwIfInvalidOffset(offset);
+
+    return (int) (offset / this.blockSize);
+  }
+
+  public int getSize(int blockNumber) {
+    if (this.fileSize == 0) {
+      return 0;
+    }
+
+    if (this.isLastBlock(blockNumber)) {
+      return (int) (this.fileSize - (((long) this.blockSize) * (this.numBlocks 
- 1)));
+    } else {
+      return this.blockSize;
+    }
+  }
+
+  public boolean isValidOffset(long offset) {
+    return (offset >= 0) && (offset < this.fileSize);
+  }
+
+  public long getStartOffset(int blockNumber) {
+    throwIfInvalidBlockNumber(blockNumber);
+
+    return blockNumber * (long) this.blockSize;
+  }
+
+  public int getRelativeOffset(int blockNumber, long offset) {
+    throwIfInvalidOffset(offset);
+
+    return (int) (offset - this.getStartOffset(blockNumber));
+  }
+
+  public State getState(int blockNumber) {
+    throwIfInvalidBlockNumber(blockNumber);
+
+    return this.state[blockNumber];
+  }
+
+  public State setState(int blockNumber, State blockState) {
+    throwIfInvalidBlockNumber(blockNumber);
+
+    this.state[blockNumber] = blockState;
+    return blockState;
+  }
+
+  // Debug helper.
+  public String getStateString() {
+    StringBuilder sb = new StringBuilder();
+    int blockNumber = 0;
+    while (blockNumber < this.numBlocks) {
+      State tstate = this.getState(blockNumber);
+      int endBlockNumber = blockNumber;
+      while ((endBlockNumber < this.numBlocks) && 
(this.getState(endBlockNumber) == tstate)) {
+        endBlockNumber++;
+      }
+      sb.append(String.format("[%03d ~ %03d] %s%n", blockNumber, 
endBlockNumber - 1, tstate));
+      blockNumber = endBlockNumber;
+    }
+    return sb.toString();
+  }
+
+  private void throwIfInvalidBlockNumber(int blockNumber) {
+    Validate.checkWithinRange(blockNumber, "blockNumber", 0, this.numBlocks - 
1);
+  }
+
+  private void throwIfInvalidOffset(long offset) {

Review comment:
       minor: add javadocs, including @throws

##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/Validate.java
##########
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.common;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collection;
+
+/**
+ * A superset of Validate class in Apache commons lang3.
+ *
+ * It provides consistent message strings for frequently encountered checks.
+ * That simplifies callers because they have to supply only the name of the 
argument
+ * that failed a check instead of having to supply the entire message.
+ */
+public final class Validate {
+  private Validate() {}
+
+  /**
+   * Validates that the given reference argument is not null.
+   *
+   * @param obj the argument reference to validate.
+   * @param argName the name of the argument being validated.
+   */
+  public static void checkNotNull(Object obj, String argName) {
+    checkArgument(obj != null, "'%s' must not be null.", argName);
+  }
+
+  /**
+   * Validates that the given integer argument is not zero or negative.
+   *
+   * @param value the argument value to validate
+   * @param argName the name of the argument being validated.
+   */
+  public static void checkPositiveInteger(long value, String argName) {
+    checkArgument(value > 0, "'%s' must be a positive integer.", argName);
+  }
+
+  /**
+   * Validates that the given integer argument is not negative.
+   *
+   * @param value the argument value to validate
+   * @param argName the name of the argument being validated.
+   */
+  public static void checkNotNegative(long value, String argName) {
+    checkArgument(value >= 0, "'%s' must not be negative.", argName);
+  }
+
+  /*
+   * Validates that the expression (that checks a required field is present) 
is true.
+   *
+   * @param isPresent indicates whether the given argument is present.
+   * @param argName the name of the argument being validated.
+   */
+  public static void checkRequired(boolean isPresent, String argName) {
+    checkArgument(isPresent, "'%s' is required.", argName);
+  }
+
+  /**
+   * Validates that the expression (that checks a field is valid) is true.
+   *
+   * @param isValid indicates whether the given argument is valid.
+   * @param argName the name of the argument being validated.
+   */
+  public static void checkValid(boolean isValid, String argName) {
+    checkArgument(isValid, "'%s' is invalid.", argName);
+  }
+
+  /**
+   * Validates that the expression (that checks a field is valid) is true.
+   *
+   * @param isValid indicates whether the given argument is valid.
+   * @param argName the name of the argument being validated.
+   * @param validValues the list of values that are allowed.
+   */
+  public static void checkValid(boolean isValid, String argName, String 
validValues) {
+    checkArgument(isValid, "'%s' is invalid. Valid values are: %s.", argName, 
validValues);
+  }
+
+  /**
+   * Validates that the given string is not null and has non-zero length.
+   *
+   * @param arg the argument reference to validate.
+   * @param argName the name of the argument being validated.
+   */
+  public static void checkNotNullAndNotEmpty(String arg, String argName) {
+    Validate.checkNotNull(arg, argName);
+    Validate.checkArgument(
+        arg.length() > 0,
+        "'%s' must not be empty.",
+        argName);
+  }
+
+  /**
+   * Validates that the given array is not null and has at least one element.
+   *
+   * @param <T> the type of array's elements.
+   * @param array the argument reference to validate.
+   * @param argName the name of the argument being validated.
+   */
+  public static <T> void checkNotNullAndNotEmpty(T[] array, String argName) {
+    Validate.checkNotNull(array, argName);
+    checkNotEmpty(array.length, argName);
+  }
+
+  /**
+   * Validates that the given array is not null and has at least one element.
+   *
+   * @param array the argument reference to validate.
+   * @param argName the name of the argument being validated.
+   */
+  public static void checkNotNullAndNotEmpty(byte[] array, String argName) {
+    Validate.checkNotNull(array, argName);
+    checkNotEmpty(array.length, argName);
+  }
+
+  /**
+   * Validates that the given array is not null and has at least one element.
+   *
+   * @param array the argument reference to validate.
+   * @param argName the name of the argument being validated.
+   */
+  public static void checkNotNullAndNotEmpty(short[] array, String argName) {
+    Validate.checkNotNull(array, argName);
+    checkNotEmpty(array.length, argName);
+  }
+
+  /**
+   * Validates that the given array is not null and has at least one element.
+   *
+   * @param array the argument reference to validate.
+   * @param argName the name of the argument being validated.
+   */
+  public static void checkNotNullAndNotEmpty(int[] array, String argName) {
+    Validate.checkNotNull(array, argName);
+    checkNotEmpty(array.length, argName);
+  }
+
+  /**
+   * Validates that the given array is not null and has at least one element.
+   *
+   * @param array the argument reference to validate.
+   * @param argName the name of the argument being validated.
+   */
+  public static void checkNotNullAndNotEmpty(long[] array, String argName) {
+    Validate.checkNotNull(array, argName);
+    checkNotEmpty(array.length, argName);
+  }
+
+  /**
+   * Validates that the given buffer is not null and has non-zero capacity.
+   *
+   * @param <T> the type of iterable's elements.
+   * @param iter the argument reference to validate.
+   * @param argName the name of the argument being validated.
+   */
+  public static <T> void checkNotNullAndNotEmpty(Iterable<T> iter, String 
argName) {
+    Validate.checkNotNull(iter, argName);
+    int minNumElements = iter.iterator().hasNext() ? 1 : 0;
+    checkNotEmpty(minNumElements, argName);
+  }
+
+  /**
+   * Validates that the given set is not null and has an exact number of items.
+   *
+   * @param <T> the type of collection's elements.
+   * @param collection the argument reference to validate.
+   * @param numElements the expected number of elements in the collection.
+   * @param argName the name of the argument being validated.
+   */
+  public static <T> void checkNotNullAndNumberOfElements(
+      Collection<T> collection, int numElements, String argName) {
+    Validate.checkNotNull(collection, argName);
+    checkArgument(
+        collection.size() == numElements,
+        "Number of elements in '%s' must be exactly %s, %s given.",
+        argName,
+        numElements,
+        collection.size()
+    );
+  }
+
+  /**
+   * Validates that the given two values are equal.
+   *
+   * @param value1 the first value to check.
+   * @param value1Name the name of the first argument.
+   * @param value2 the second value to check.
+   * @param value2Name the name of the second argument.
+   */
+  public static void checkValuesEqual(
+      long value1,
+      String value1Name,
+      long value2,
+      String value2Name) {
+    checkArgument(
+        value1 == value2,
+        "'%s' (%s) must equal '%s' (%s).",
+        value1Name,
+        value1,
+        value2Name,
+        value2);
+  }
+
+  /**
+   * Validates that the first value is an integer multiple of the second value.
+   *
+   * @param value1 the first value to check.
+   * @param value1Name the name of the first argument.
+   * @param value2 the second value to check.
+   * @param value2Name the name of the second argument.
+   */
+  public static void checkIntegerMultiple(
+      long value1,
+      String value1Name,
+      long value2,
+      String value2Name) {
+    checkArgument(
+        (value1 % value2) == 0,
+        "'%s' (%s) must be an integer multiple of '%s' (%s).",
+        value1Name,
+        value1,
+        value2Name,
+        value2);
+  }
+
+  /**
+   * Validates that the first value is greater than the second value.
+   *
+   * @param value1 the first value to check.
+   * @param value1Name the name of the first argument.
+   * @param value2 the second value to check.
+   * @param value2Name the name of the second argument.
+   */
+  public static void checkGreater(
+      long value1,
+      String value1Name,
+      long value2,
+      String value2Name) {
+    checkArgument(
+        value1 > value2,
+        "'%s' (%s) must be greater than '%s' (%s).",
+        value1Name,
+        value1,
+        value2Name,
+        value2);
+  }
+
+  /**
+   * Validates that the first value is greater than or equal to the second 
value.
+   *
+   * @param value1 the first value to check.
+   * @param value1Name the name of the first argument.
+   * @param value2 the second value to check.
+   * @param value2Name the name of the second argument.
+   */
+  public static void checkGreaterOrEqual(
+      long value1,
+      String value1Name,
+      long value2,
+      String value2Name) {
+    checkArgument(
+        value1 >= value2,
+        "'%s' (%s) must be greater than or equal to '%s' (%s).",
+        value1Name,
+        value1,
+        value2Name,
+        value2);
+  }
+
+  /**
+   * Validates that the first value is less than or equal to the second value.
+   *
+   * @param value1 the first value to check.
+   * @param value1Name the name of the first argument.
+   * @param value2 the second value to check.
+   * @param value2Name the name of the second argument.
+   */
+  public static void checkLessOrEqual(
+      long value1,
+      String value1Name,
+      long value2,
+      String value2Name) {
+    checkArgument(
+        value1 <= value2,
+        "'%s' (%s) must be less than or equal to '%s' (%s).",
+        value1Name,
+        value1,
+        value2Name,
+        value2);
+  }
+
+  /**
+   * Validates that the given value is within the given range of values.
+   *
+   * @param value the value to check.
+   * @param valueName the name of the argument.
+   * @param minValueInclusive inclusive lower limit for the value.
+   * @param maxValueInclusive inclusive upper limit for the value.
+   */
+  public static void checkWithinRange(
+      long value,
+      String valueName,
+      long minValueInclusive,
+      long maxValueInclusive) {
+    checkArgument(
+        (value >= minValueInclusive) && (value <= maxValueInclusive),
+        "'%s' (%s) must be within the range [%s, %s].",
+        valueName,
+        value,
+        minValueInclusive,
+        maxValueInclusive);
+  }
+
+  /**
+   * Validates that the given value is within the given range of values.
+   *
+   * @param value the value to check.
+   * @param valueName the name of the argument.
+   * @param minValueInclusive inclusive lower limit for the value.
+   * @param maxValueInclusive inclusive upper limit for the value.
+   */
+  public static void checkWithinRange(
+      double value,
+      String valueName,
+      double minValueInclusive,
+      double maxValueInclusive) {
+    checkArgument(
+        (value >= minValueInclusive) && (value <= maxValueInclusive),
+        "'%s' (%s) must be within the range [%s, %s].",
+        valueName,
+        value,
+        minValueInclusive,
+        maxValueInclusive);
+  }
+
+  /**
+   * Validates that the given path exists.
+   *
+   * @param path the path to check.
+   * @param argName the name of the argument being validated.
+   */
+  public static void checkPathExists(Path path, String argName) {
+    checkNotNull(path, argName);
+    checkArgument(Files.exists(path), "Path %s (%s) does not exist.", argName, 
path);
+  }
+
+  /**
+   * Validates that the given path exists and is a directory.
+   *
+   * @param path the path to check.
+   * @param argName the name of the argument being validated.
+   */
+  public static void checkPathExistsAsDir(Path path, String argName) {
+    checkPathExists(path, argName);
+    checkArgument(
+        Files.isDirectory(path),
+        "Path %s (%s) must point to a directory.",
+        argName,
+        path);
+  }
+
+  /**
+   * Validates that the given path exists and is a file.
+   *
+   * @param path the path to check.
+   * @param argName the name of the argument being validated.
+   */
+  public static void checkPathExistsAsFile(Path path, String argName) {
+    checkPathExists(path, argName);
+    checkArgument(Files.isRegularFile(path), "Path %s (%s) must point to a 
file.", argName, path);
+  }
+
+  public static void checkArgument(boolean expression, String format, 
Object... args) {

Review comment:
       We have our own version of guava preconditions in 
org.apache.hadoop.util; please use.

##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/SingleFilePerBlockCache.java
##########
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.common;
+
+import org.slf4j.Logger;

Review comment:
       nit: import ordering.

##########
File path: 
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3File.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.read;
+
+import com.amazonaws.services.s3.AmazonS3;
+import org.apache.hadoop.fs.common.Validate;
+import org.apache.hadoop.fs.s3a.MockS3ClientFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+class TestS3File extends S3File {

Review comment:
       `Test` must only be used for junit test suites

##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3BlockManager.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.read;
+
+import org.apache.hadoop.fs.common.BlockData;
+import org.apache.hadoop.fs.common.BlockManager;
+import org.apache.hadoop.fs.common.Validate;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * Provides read access to S3 file one block at a time.
+ *
+ * A naive implementation of a {@code BlockManager} that provides no 
prefetching or caching.
+ * Useful baseline for comparing performance difference against {@code 
S3CachingBlockManager}.
+ */
+public class S3BlockManager extends BlockManager {
+
+  // Reader that reads from S3 file.
+  private S3Reader reader;
+
+  /**
+   * Constructs an instance of {@code S3BlockManager}.
+   *
+   * @param reader a reader that reads from S3 file.
+   * @param blockData information about each block of the S3 file.
+   */
+  public S3BlockManager(S3Reader reader, BlockData blockData) {
+    super(blockData);
+
+    Validate.checkNotNull(reader, "reader");
+
+    this.reader = reader;
+  }
+
+  /**
+   * Reads into the given {@code buffer} {@code size} bytes from the 
underlying file
+   * starting at {@code startOffset}.
+   *
+   * @param buffer the buffer to read data in to.
+   * @param startOffset the offset at which reading starts.
+   * @param size the number bytes to read.
+   * @return number of bytes read.
+   */
+  @Override
+  public int read(ByteBuffer buffer, long startOffset, int size) throws 
IOException {
+    return this.reader.read(buffer, startOffset, size);

Review comment:
       nit: code convention is only to use this. in constructors and setters

##########
File path: hadoop-tools/hadoop-aws/pom.xml
##########
@@ -412,6 +412,10 @@
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-surefire-plugin</artifactId>
         <configuration>
+          <includes>

Review comment:
       no

##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3File.java
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.read;
+
+import org.apache.hadoop.fs.common.Validate;
+import org.apache.hadoop.fs.common.Io;
+
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.S3Object;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Ecapsulates low level interactions with S3 object on AWS.
+ */
+public class S3File implements Closeable {
+  // Client used for accessing S3 objects.
+  private final AmazonS3 client;
+
+  private final String bucket;
+  private final String key;
+
+  // Size of S3 file.
+  private long fileSize;
+
+  // Maps a stream returned by openForRead() to the associated S3 object.
+  // That allows us to close the object when closing the stream.
+  private Map<InputStream, S3Object> s3Objects;
+
+  /**
+   * Creates an instance of {@link S3File}.
+   *
+   * @param client AWS S3 client instance.
+   * @param bucket the bucket to read from.
+   * @param key the key to read from.
+   */
+  public S3File(AmazonS3 client, String bucket, String key) {
+    this(client, bucket, key, -1);
+  }
+
+  /**
+   * Creates an instance of {@link S3File}.
+   *
+   * @param client AWS S3 client instance.
+   * @param bucket the bucket to read from.
+   * @param key the key to read from.
+   * @param size the size of this file. it should be passed in if pre-known
+   *        to avoid an additional network call. If size is not known,
+   *        pass a negative number so that size can be obtained if needed.
+   */
+  public S3File(AmazonS3 client, String bucket, String key, long size) {
+    Validate.checkNotNull(client, "client");
+    Validate.checkNotNullAndNotEmpty(bucket, "bucket");
+    Validate.checkNotNullAndNotEmpty(key, "key");
+
+    this.client = client;
+    this.bucket = bucket;
+    this.key = key;
+    this.fileSize = size;
+    this.s3Objects = new IdentityHashMap<InputStream, S3Object>();
+  }
+
+  public String getPath() {
+    return String.format("s3://%s/%s", this.bucket, this.key);
+  }
+
+  /**
+   * Gets the size of this file.
+   * Its value is cached once obtained from AWS.
+   *
+   * @return the size of this file.
+   * @throws IOException if there is an error obtaining file size.
+   */
+  public long size() throws IOException {
+    if (this.fileSize < 0) {
+      this.fileSize = client.getObjectMetadata(bucket, key).getContentLength();

Review comment:
       This will be passed in to the input Stream when constructed, retrieved 
either from a HEAD request in open() or possibly via a FileStatus/length passed 
in an openFile() call. The Etag/ object version should also be retrieved then 
and used and subsequent requests to ensure consistent object versions.

##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
##########
@@ -1442,7 +1477,31 @@ protected URI canonicalizeUri(URI rawUri) {
   @Retries.RetryTranslated
   public FSDataInputStream open(Path f, int bufferSize)
       throws IOException {
-    return open(f, Optional.empty(), Optional.empty());
+    if (this.prefetchEnabled) {
+      return this.createPrefetchingInputStream(f, bufferSize);
+    } else {
+      return open(f, Optional.empty(), Optional.empty());

Review comment:
       This must go in the open/3 method below, so that openFile() will also 
use it.
   You will also get the benefits of the filestatus retrieved during the 
existence checks, with all length and etag; openFile() allows callers to pass 
it in so we can skip the HEAD request. 

##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
##########
@@ -540,7 +540,7 @@ private Constants() {
 
   public static final String USER_AGENT_PREFIX = "fs.s3a.user.agent.prefix";
 
-  /** Whether or not to allow MetadataStore to be source of truth for a path 
prefix */
+  /** Whether or not to allow MetadataStore to be source of truth for a path 
prefix. */

Review comment:
       revert

##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3EInputStream.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.read;
+
+import com.amazonaws.services.s3.AmazonS3;
+import com.twitter.util.FuturePool;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Enhanced {@code InputStream} for reading from S3.
+ *
+ * This implementation provides improved read throughput by asynchronously 
prefetching
+ * blocks of configurable size from the underlying S3 file.
+ */
+public class S3EInputStream extends FSInputStream {
+  private static final Logger LOG = 
LoggerFactory.getLogger(S3EInputStream.class);
+
+  // Underlying input stream used for reading S3 file.
+  private S3InputStream inputStream;
+
+  // S3 access stats.
+  private FileSystem.Statistics stats;
+
+  /**
+   * Constructs an instance of {@link S3EInputStream}.
+   *
+   * @param futurePool Future pool used for async reading activity.
+   * @param prefetchBlockSize Size of each prefetched block.
+   * @param numBlocksToPrefetch Size of the prefetch queue (in number of 
blocks).
+   * @param bucket Name of S3 bucket from which key is opened.
+   * @param key Name of the S3 key to open.
+   * @param contentLength length of the file.
+   * @param client S3 access client.
+   * @param stats {@link FileSystem} stats related to read operation.
+   */
+  public S3EInputStream(
+      FuturePool futurePool,
+      int prefetchBlockSize,
+      int numBlocksToPrefetch,
+      String bucket,
+      String key,
+      long contentLength,
+      AmazonS3 client,
+      FileSystem.Statistics stats) {

Review comment:
       Expect to be constructed with a reference to a 
org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics instance. a stub 
one can be supplied for testing. 
   The production one will update the file system statistics as appropriate. 

##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/Validate.java
##########
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.common;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collection;
+
+/**
+ * A superset of Validate class in Apache commons lang3.
+ *
+ * It provides consistent message strings for frequently encountered checks.
+ * That simplifies callers because they have to supply only the name of the 
argument
+ * that failed a check instead of having to supply the entire message.
+ */
+public final class Validate {
+  private Validate() {}
+
+  /**
+   * Validates that the given reference argument is not null.
+   *
+   * @param obj the argument reference to validate.
+   * @param argName the name of the argument being validated.
+   */
+  public static void checkNotNull(Object obj, String argName) {
+    checkArgument(obj != null, "'%s' must not be null.", argName);

Review comment:
       We are adopting `Objects.requireNonNull()` as it can be used in the 
assignments.
   1. Please use directly where appropriate
   1. These new validators should follow the same design

##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BlockData.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.common;
+
+/**
+ * Holds information about blocks of data in an S3 file.
+ */
+public class BlockData {
+  // State of each block of data.
+  enum State {
+    // Data is not yet ready to be read.
+    NOT_READY,
+
+    // A read of this block has been queued.
+    QUEUED,
+
+    // This block is ready to be read.
+    READY,
+
+    // This block has been cached.
+    CACHED
+  }
+
+  // State of all blocks in an S3 file.
+  private State[] state;
+
+  // The size of an S3 file.
+  private final long fileSize;
+
+  // The S3 file is divided into blocks of this size.
+  private final int blockSize;
+
+  // The S3 file has these many blocks.
+  private final int numBlocks;
+
+  /**
+   * Constructs an instance of {@link BlockData}.
+   *
+   * @param fileSize the size of an S3 file.
+   * @param blockSize the S3 file is divided into blocks of this size.
+   */
+  public BlockData(long fileSize, int blockSize) {
+    Validate.checkNotNegative(fileSize, "fileSize");
+    if (fileSize == 0) {
+      Validate.checkNotNegative(blockSize, "blockSize");
+    } else {
+      Validate.checkPositiveInteger(blockSize, "blockSize");
+    }
+
+    this.fileSize = fileSize;
+    this.blockSize = blockSize;
+    this.numBlocks =
+        (fileSize == 0) ? 0 : ((int) (fileSize / blockSize)) + (fileSize % 
blockSize > 0 ? 1 : 0);
+    this.state = new State[this.numBlocks];
+    for (int b = 0; b < this.numBlocks; b++) {
+      this.setState(b, State.NOT_READY);
+    }
+  }
+
+  public int getBlockSize() {
+    return this.blockSize;
+  }
+
+  public long getFileSize() {
+    return this.fileSize;
+  }
+
+  public int getNumBlocks() {
+    return this.numBlocks;
+  }
+
+  public boolean isLastBlock(int blockNumber) {
+    if (this.fileSize == 0) {
+      return false;
+    }
+
+    throwIfInvalidBlockNumber(blockNumber);
+
+    return blockNumber == (this.numBlocks - 1);
+  }
+
+  public int getBlockNumber(long offset) {
+    throwIfInvalidOffset(offset);
+
+    return (int) (offset / this.blockSize);
+  }
+
+  public int getSize(int blockNumber) {

Review comment:
       minor: add javadocs

##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java
##########
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.common;
+
+import com.twitter.util.Await;
+import com.twitter.util.ExceptionalFunction0;
+import com.twitter.util.Future;
+import com.twitter.util.FuturePool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Provides read access to the underlying file one block at a time.
+ * Improve read performance by prefetching and locall caching blocks.
+ */
+public abstract class CachingBlockManager extends BlockManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(CachingBlockManager.class);
+
+  // Asynchronous tasks are performed in this pool.
+  private FuturePool futurePool;

Review comment:
       `final` here and elsewhere

##########
File path: 
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/ExceptionAsserts.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.common;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public final class ExceptionAsserts {

Review comment:
       Replace completely with org.apache.hadoop.test.LambdaTestUtils

##########
File path: 
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/S3AccessRetryerTest.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.read;
+
+import static org.junit.Assert.*;
+
+import org.apache.hadoop.fs.common.ExceptionAsserts;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.s3.model.AmazonS3Exception;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class S3AccessRetryerTest {

Review comment:
       unless the suites starts with `Test` , maven it will not run it. 
   1. Re-name this and the other test suites. 
   3. Make subclasses of org.apache.hadoop.test.AbstractHadoopTestBase
   

##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/README.md
##########
@@ -0,0 +1,107 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# High Performance S3 InputStream

Review comment:
       Move to hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws

##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BlockData.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.common;
+
+/**
+ * Holds information about blocks of data in an S3 file.

Review comment:
       nit: remove references to S3.

##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java
##########
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.common;
+
+import com.twitter.util.Await;

Review comment:
       Does this add a new jar to production classpaths?
   
   We are always really reluctant to do this as it's always creates pain 
downstream. We create enough grief just by changing the versions of things we 
already ship.
   Is there anyway to remove it? This includes using something from guava if 
need be.

##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3AccessRetryer.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.read;
+
+import org.apache.hadoop.fs.common.Validate;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.s3.model.AmazonS3Exception;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Encapsulates retry related functionality when accessing S3.
+ */
+public class S3AccessRetryer {
+  private static final Logger LOG = 
LoggerFactory.getLogger(S3AccessRetryer.class);
+
+  // Default value of delay before first retry.
+  public static final int RETRY_BASE_DELAY_MS = 500;
+
+  // Default value of maximum delay after which no additional retries will be 
attempted.
+  public static final int RETRY_MAX_BACKOFF_TIME_MS = 5000;
+
+  private int retryDelay;
+  private int retryMaxDelay;
+
+  public S3AccessRetryer() {
+    this(RETRY_BASE_DELAY_MS, RETRY_MAX_BACKOFF_TIME_MS);
+  }
+
+  /**
+   * Constructs an instance of {@link S3AccessRetryer}.
+   *
+   * @param baseDelay the delay before attempting the first retry.
+   * @param maxDelay the amount of delay after which no additional retries 
will be attempted.
+   */
+  public S3AccessRetryer(int baseDelay, int maxDelay) {
+    Validate.checkPositiveInteger(baseDelay, "baseDelay");
+    Validate.checkGreater(maxDelay, "maxDelay", baseDelay, "baseDelay");
+
+    this.retryDelay = baseDelay;
+    this.retryMaxDelay = maxDelay;
+  }
+
+  /**
+   * Determines whether the caller should retry an S3 access attempt.
+   * If a retry is needed, performs a {@code Thread.sleep()} before
+   * returning true; otherwise, returns false immediately.
+   *
+   * @param e the exception encountered by the caller.
+   * @return true if the caller should retry an S3 access attempt, false 
otherwise.
+   */
+  public boolean retry(Exception e) {
+    if (isRetryable(e) && (this.retryDelay <= this.retryMaxDelay)) {
+      try {
+        Thread.sleep(this.retryDelay);
+      } catch (InterruptedException ie) {
+        return false;
+      }
+      this.retryDelay *= 2;
+      return true;
+    }
+
+    return false;
+  }
+
+  /**
+   * Determines whether the current exception is retryable.
+   */
+  private boolean isRetryable(Exception e) {
+    if (e instanceof IOException) {
+      // All IOException instances are retryable.
+      return true;

Review comment:
       UnknownHostException? NoRouteToHostException?

##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BlockManager.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.common;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * Provides read access to the underlying file one block at a time.
+ *
+ * This class is the simplest form of a {@code BlockManager} that does
+ * perform prefetching or caching.
+ */
+public abstract class BlockManager implements Closeable {
+
+  // Information about each block of the underlying file.
+  private BlockData blockData;

Review comment:
       final

##########
File path: 
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
##########
@@ -1431,7 +1431,7 @@ public static boolean 
metadataStorePersistsAuthoritativeBit(MetadataStore ms)
    */
   public static void setMetadataStore(S3AFileSystem fs, MetadataStore ms) {
     fs.setMetadataStore(ms);
-}
+  }

Review comment:
       Skip this; we will be cutting the method soon

##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3File.java
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.read;
+
+import org.apache.hadoop.fs.common.Validate;
+import org.apache.hadoop.fs.common.Io;
+
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.S3Object;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Ecapsulates low level interactions with S3 object on AWS.
+ */
+public class S3File implements Closeable {
+  // Client used for accessing S3 objects.
+  private final AmazonS3 client;
+
+  private final String bucket;
+  private final String key;
+
+  // Size of S3 file.
+  private long fileSize;
+
+  // Maps a stream returned by openForRead() to the associated S3 object.
+  // That allows us to close the object when closing the stream.
+  private Map<InputStream, S3Object> s3Objects;
+
+  /**
+   * Creates an instance of {@link S3File}.
+   *
+   * @param client AWS S3 client instance.
+   * @param bucket the bucket to read from.
+   * @param key the key to read from.
+   */
+  public S3File(AmazonS3 client, String bucket, String key) {
+    this(client, bucket, key, -1);
+  }
+
+  /**
+   * Creates an instance of {@link S3File}.
+   *
+   * @param client AWS S3 client instance.
+   * @param bucket the bucket to read from.
+   * @param key the key to read from.
+   * @param size the size of this file. it should be passed in if pre-known
+   *        to avoid an additional network call. If size is not known,
+   *        pass a negative number so that size can be obtained if needed.
+   */
+  public S3File(AmazonS3 client, String bucket, String key, long size) {
+    Validate.checkNotNull(client, "client");
+    Validate.checkNotNullAndNotEmpty(bucket, "bucket");
+    Validate.checkNotNullAndNotEmpty(key, "key");
+
+    this.client = client;
+    this.bucket = bucket;
+    this.key = key;
+    this.fileSize = size;
+    this.s3Objects = new IdentityHashMap<InputStream, S3Object>();
+  }
+
+  public String getPath() {
+    return String.format("s3://%s/%s", this.bucket, this.key);

Review comment:
       s3a:

##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3AccessRetryer.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.read;
+
+import org.apache.hadoop.fs.common.Validate;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.s3.model.AmazonS3Exception;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Encapsulates retry related functionality when accessing S3.
+ */
+public class S3AccessRetryer {

Review comment:
       you need to use `org.apache.hadoop.io.retry.RetryPolicy` so policies can 
be plugged in.
   `org.apache.hadoop.fs.s3a.Invoker` invokes lambda expressions with a retry 
policy
   With `org.apache.hadoop.fs.s3a.S3ARetryPolicy` the policy which should 
suffice.
   This should be the single place where we define retry policies for S3 
interaction
   
   I think we should be able to replace this class entirely

##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/Retryer.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.common;
+
+/**
+ * Provides retry related functionality.
+ */
+public class Retryer {

Review comment:
       see S3AccessRetryer comments

##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/Io.java
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.common;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Provides misc functionality related to IO.
+ */
+public final class Io {

Review comment:
       use org.apache.hadoop.io.IOUtils#cleanupWithLogger

##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
##########
@@ -3926,12 +3985,12 @@ public void copyFromLocalFile(boolean delSrc, boolean 
overwrite, Path src,
   }
 
   protected CopyFromLocalOperation.CopyFromLocalOperationCallbacks
-  createCopyFromLocalCallbacks() throws IOException {
+      createCopyFromLocalCallbacks() throws IOException {

Review comment:
       omit these

##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3InputStream.java
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.read;
+
+import org.apache.hadoop.fs.common.BlockData;
+import org.apache.hadoop.fs.common.FilePosition;
+import org.apache.hadoop.fs.common.Validate;
+
+import com.amazonaws.services.s3.AmazonS3;
+import com.twitter.util.FuturePool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * Provides an {@link InputStream} that allows reading from an S3 file.
+ */
+public abstract class S3InputStream extends InputStream {

Review comment:
       everything which goes wrong here MUST raise an IOexception. 
   The invoker coming in on the S3AReadOpContext (which should be passed down 
in the constructor) will do this
   When you use it: declare the retry policy through the @Retries annotation. 
See S3AInputStream for the details.
   

##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3File.java
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.read;
+
+import org.apache.hadoop.fs.common.Validate;
+import org.apache.hadoop.fs.common.Io;
+
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.S3Object;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Ecapsulates low level interactions with S3 object on AWS.
+ */
+public class S3File implements Closeable {
+  // Client used for accessing S3 objects.
+  private final AmazonS3 client;

Review comment:
       I'm afraid you are not going to get direct access to that client; we are 
moving away from that across all the code as part of a gradual relayering. 
   Either use the same callback interface used by S3AInputStream or add a new 
one alongside it, again with audit context tracking.
   the interface will also make it easier to test has that won't be a need for 
a mock S3 client any more -just different implementations of the interface.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to