This is an automated email from the ASF dual-hosted git repository.

cnauroth pushed a commit to branch HADOOP-19343
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 1b12f77c8a21d0879ea9cb7f719235fadaeb0e53
Author: Arunkumar Chacko <aruncha...@google.com>
AuthorDate: Wed Jun 18 04:21:07 2025 +0000

    HADOOP-19343: Add support for open() and rename()
    
    Closes #7742
    
    Signed-off-by: Chris Nauroth <cnaur...@apache.org>
---
 .../fs/gs/{ListFileOptions.java => Fadvise.java}   |  19 +-
 .../hadoop/fs/gs/FileAccessPatternManager.java     | 184 +++++++
 .../apache/hadoop/fs/gs/GoogleCloudStorage.java    | 289 ++++++++++
 .../fs/gs/GoogleCloudStorageClientReadChannel.java | 609 +++++++++++++++++++++
 .../hadoop/fs/gs/GoogleCloudStorageExceptions.java |  58 ++
 .../hadoop/fs/gs/GoogleCloudStorageFileSystem.java | 322 ++++++++++-
 .../hadoop/fs/gs/GoogleHadoopFSInputStream.java    | 187 +++++++
 .../hadoop/fs/gs/GoogleHadoopFileSystem.java       |  45 +-
 .../fs/gs/GoogleHadoopFileSystemConfiguration.java | 120 +++-
 .../src/main/java/org/apache/hadoop/fs/gs/Gs.java  |  64 +++
 .../org/apache/hadoop/fs/gs/ListFileOptions.java   |   4 +
 .../fs/gs/contract/ITestGoogleContractOpen.java}   |  21 +-
 .../fs/gs/contract/ITestGoogleContractRename.java  |  43 ++
 .../fs/gs/contract/ITestGoogleContractSeek.java}   |  20 +-
 14 files changed, 1928 insertions(+), 57 deletions(-)

diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ListFileOptions.java
 b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/Fadvise.java
similarity index 72%
copy from 
hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ListFileOptions.java
copy to 
hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/Fadvise.java
index 2bc74c6fc21..6bc82324ddf 100644
--- 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ListFileOptions.java
+++ b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/Fadvise.java
@@ -1,3 +1,5 @@
+package org.apache.hadoop.fs.gs;
+
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -16,19 +18,6 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.fs.gs;
-
-import javax.annotation.Nonnull;
-
-final class ListFileOptions {
-  static final ListFileOptions OBJECTFIELDS = new 
ListFileOptions("bucket,name,size,updated");
-  private final String fields;
-
-  private ListFileOptions(@Nonnull String fields) {
-    this.fields = fields;
-  }
-
-  String getFields() {
-    return fields;
-  }
+enum Fadvise {
+  RANDOM, SEQUENTIAL, AUTO, AUTO_RANDOM
 }
diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/FileAccessPatternManager.java
 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/FileAccessPatternManager.java
new file mode 100644
index 00000000000..e653e2d4bf1
--- /dev/null
+++ 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/FileAccessPatternManager.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.gs;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages the access pattern of object being read from cloud storage. For 
adaptive fadvise
+ * configurations it computes the access pattern based on previous requests.
+ */
+class FileAccessPatternManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FileAccessPatternManager.class);
+  private final StorageResourceId resourceId;
+  private final GoogleHadoopFileSystemConfiguration config;
+  private final Fadvise fadvise;
+  private boolean isPatternOverriden;
+  private boolean randomAccess;
+  // keeps track of any backward seek requested in lifecycle of InputStream
+  private boolean isBackwardSeekRequested = false;
+  // keeps track of any backward seek requested in lifecycle of InputStream
+  private boolean isForwardSeekRequested = false;
+  private long lastServedIndex = -1;
+  // Keeps track of distance between consecutive requests
+  private int consecutiveSequentialCount = 0;
+
+  FileAccessPatternManager(
+      StorageResourceId resourceId, GoogleHadoopFileSystemConfiguration 
configuration) {
+    this.isPatternOverriden = false;
+    this.resourceId = resourceId;
+    this.config = configuration;
+    this.fadvise = config.getFadvise();
+    this.randomAccess = fadvise == Fadvise.AUTO_RANDOM || fadvise == 
Fadvise.RANDOM;
+  }
+
+  void updateLastServedIndex(long position) {
+    this.lastServedIndex = position;
+  }
+
+  boolean shouldAdaptToRandomAccess() {
+    return randomAccess;
+  }
+
+  void updateAccessPattern(long currentPosition) {
+    if (isPatternOverriden) {
+      LOG.trace("Will bypass computing access pattern as it's overriden for 
resource :{}",
+          resourceId);
+      return;
+    }
+    updateSeekFlags(currentPosition);
+    if (fadvise == Fadvise.AUTO_RANDOM) {
+      if (randomAccess) {
+        if (shouldAdaptToSequential(currentPosition)) {
+          unsetRandomAccess();
+        }
+      } else {
+        if (shouldAdaptToRandomAccess(currentPosition)) {
+          setRandomAccess();
+        }
+      }
+    } else if (fadvise == Fadvise.AUTO) {
+      if (shouldAdaptToRandomAccess(currentPosition)) {
+        setRandomAccess();
+      }
+    }
+  }
+
+  /**
+   * This provides a way to override the access isRandomPattern, once 
overridden it will not be
+   * recomputed for adaptive fadvise types.
+   *
+   * @param isRandomPattern, true, to override with random access else false
+   */
+  void overrideAccessPattern(boolean isRandomPattern) {
+    this.isPatternOverriden = true;
+    this.randomAccess = isRandomPattern;
+    LOG.trace(
+        "Overriding the random access pattern to %s for fadvise:%s for 
resource: %s ",
+        isRandomPattern, fadvise, resourceId);
+  }
+
+  private boolean shouldAdaptToSequential(long currentPosition) {
+    if (lastServedIndex != -1) {
+      long distance = currentPosition - lastServedIndex;
+      if (distance < 0 || distance > config.getInplaceSeekLimit()) {
+        consecutiveSequentialCount = 0;
+      } else {
+        consecutiveSequentialCount++;
+      }
+    }
+
+    if (!shouldDetectSequentialAccess()) {
+      return false;
+    }
+
+    if (consecutiveSequentialCount < config.getFadviseRequestTrackCount()) {
+      return false;
+    }
+    LOG.trace(
+        "Detected {} consecutive read request within distance threshold {} 
with fadvise: {} "
+            + "switching to sequential IO for '{}'",
+        consecutiveSequentialCount,
+        config.getInplaceSeekLimit(),
+        fadvise,
+        resourceId);
+    return true;
+  }
+
+  private boolean shouldAdaptToRandomAccess(long currentPosition) {
+    if (!shouldDetectRandomAccess()) {
+      return false;
+    }
+    if (lastServedIndex == -1) {
+      return false;
+    }
+
+    if (isBackwardOrForwardSeekRequested()) {
+      LOG.trace(
+          "Backward or forward seek requested, isBackwardSeek: {}, 
isForwardSeek:{} for '{}'",
+          isBackwardSeekRequested, isForwardSeekRequested, resourceId);
+      return true;
+    }
+    return false;
+  }
+
+  private boolean shouldDetectSequentialAccess() {
+    return randomAccess
+        && !isBackwardOrForwardSeekRequested()
+        && consecutiveSequentialCount >= config.getFadviseRequestTrackCount()
+        && fadvise == Fadvise.AUTO_RANDOM;
+  }
+
+  private boolean shouldDetectRandomAccess() {
+    return !randomAccess && (fadvise == Fadvise.AUTO || fadvise == 
Fadvise.AUTO_RANDOM);
+  }
+
+  private void setRandomAccess() {
+    randomAccess = true;
+  }
+
+  private void unsetRandomAccess() {
+    randomAccess = false;
+  }
+
+  private boolean isBackwardOrForwardSeekRequested() {
+    return isBackwardSeekRequested || isForwardSeekRequested;
+  }
+
+  private void updateSeekFlags(long currentPosition) {
+    if (lastServedIndex == -1) {
+      return;
+    }
+
+    if (currentPosition < lastServedIndex) {
+      isBackwardSeekRequested = true;
+      LOG.trace(
+          "Detected backward read from {} to {} position, updating to 
backwardSeek for '{}'",
+          lastServedIndex, currentPosition, resourceId);
+
+    } else if (lastServedIndex + config.getInplaceSeekLimit() < 
currentPosition) {
+      isForwardSeekRequested = true;
+      LOG.trace(
+          "Detected forward read from {} to {} position over {} threshold,"
+              + " updated to forwardSeek for '{}'",
+          lastServedIndex, currentPosition, config.getInplaceSeekLimit(), 
resourceId);
+    }
+  }
+}
diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorage.java
 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorage.java
index d68eca6a8a5..89a86eef8ff 100644
--- 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorage.java
+++ 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorage.java
@@ -21,6 +21,7 @@
 import static 
org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.*;
 import static 
org.apache.hadoop.thirdparty.com.google.common.base.Strings.isNullOrEmpty;
 import static java.lang.Math.toIntExact;
+import static 
org.apache.hadoop.fs.gs.GoogleCloudStorageExceptions.createFileNotFoundException;
 
 import com.google.api.client.util.BackOff;
 import com.google.api.client.util.ExponentialBackOff;
@@ -34,7 +35,9 @@
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
+import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.nio.channels.SeekableByteChannel;
 import java.nio.channels.WritableByteChannel;
 import java.nio.file.FileAlreadyExistsException;
 import java.time.Duration;
@@ -582,6 +585,292 @@ private List<Bucket> listBucketsInternal() throws 
IOException {
     return allBuckets;
   }
 
+  public SeekableByteChannel open(GoogleCloudStorageItemInfo itemInfo,
+      GoogleHadoopFileSystemConfiguration config) throws IOException {
+    LOG.trace("open({})", itemInfo);
+    checkNotNull(itemInfo, "itemInfo should not be null");
+
+    StorageResourceId resourceId = itemInfo.getResourceId();
+    checkArgument(
+        resourceId.isStorageObject(), "Expected full StorageObject id, got 
%s", resourceId);
+
+    return open(resourceId, itemInfo, config);
+  }
+
+  private SeekableByteChannel open(
+      StorageResourceId resourceId,
+      GoogleCloudStorageItemInfo itemInfo,
+      GoogleHadoopFileSystemConfiguration config)
+      throws IOException {
+    return new GoogleCloudStorageClientReadChannel(
+        storage,
+        itemInfo == null ? getItemInfo(resourceId) : itemInfo,
+        config);
+  }
+
+  public void move(Map<StorageResourceId, StorageResourceId> 
sourceToDestinationObjectsMap)
+      throws IOException {
+    validateMoveArguments(sourceToDestinationObjectsMap);
+
+    if (sourceToDestinationObjectsMap.isEmpty()) {
+      return;
+    }
+
+    for (Map.Entry<StorageResourceId, StorageResourceId> entry :
+        sourceToDestinationObjectsMap.entrySet()) {
+      StorageResourceId srcObject = entry.getKey();
+      StorageResourceId dstObject = entry.getValue();
+      // TODO: Do this concurrently
+      moveInternal(
+          srcObject.getBucketName(),
+          srcObject.getGenerationId(),
+          srcObject.getObjectName(),
+          dstObject.getGenerationId(),
+          dstObject.getObjectName());
+    }
+  }
+
+  private void moveInternal(
+      String srcBucketName,
+      long srcContentGeneration,
+      String srcObjectName,
+      long dstContentGeneration,
+      String dstObjectName) throws IOException {
+    Storage.MoveBlobRequest.Builder moveRequestBuilder =
+        createMoveRequestBuilder(
+            srcBucketName,
+            srcObjectName,
+            dstObjectName,
+            srcContentGeneration,
+            dstContentGeneration);
+    try {
+      String srcString = StringPaths.fromComponents(srcBucketName, 
srcObjectName);
+      String dstString = StringPaths.fromComponents(srcBucketName, 
dstObjectName);
+
+      Blob movedBlob = storage.moveBlob(moveRequestBuilder.build());
+      if (movedBlob != null) {
+        LOG.trace("Successfully moved {} to {}", srcString, dstString);
+      }
+    } catch (StorageException e) {
+      if (ErrorTypeExtractor.getErrorType(e) == 
ErrorTypeExtractor.ErrorType.NOT_FOUND) {
+        throw createFileNotFoundException(srcBucketName, srcObjectName, new 
IOException(e));
+      } else {
+        throw
+            new IOException(
+                String.format(
+                    "Error moving '%s'",
+                    StringPaths.fromComponents(srcBucketName, srcObjectName)),
+                e);
+      }
+    }
+  }
+
+  /** Creates a builder for a blob move request. */
+  private Storage.MoveBlobRequest.Builder createMoveRequestBuilder(
+      String srcBucketName,
+      String srcObjectName,
+      String dstObjectName,
+      long srcContentGeneration,
+      long dstContentGeneration) {
+
+    Storage.MoveBlobRequest.Builder moveRequestBuilder =
+        
Storage.MoveBlobRequest.newBuilder().setSource(BlobId.of(srcBucketName, 
srcObjectName));
+    moveRequestBuilder.setTarget(BlobId.of(srcBucketName, dstObjectName));
+
+    List<Storage.BlobTargetOption> blobTargetOptions = new ArrayList<>();
+    List<Storage.BlobSourceOption> blobSourceOptions = new ArrayList<>();
+
+    if (srcContentGeneration != StorageResourceId.UNKNOWN_GENERATION_ID) {
+      
blobSourceOptions.add(Storage.BlobSourceOption.generationMatch(srcContentGeneration));
+    }
+
+    if (dstContentGeneration != StorageResourceId.UNKNOWN_GENERATION_ID) {
+      
blobTargetOptions.add(Storage.BlobTargetOption.generationMatch(dstContentGeneration));
+    }
+
+    // TODO: Add encryption support
+
+    moveRequestBuilder.setSourceOptions(blobSourceOptions);
+    moveRequestBuilder.setTargetOptions(blobTargetOptions);
+
+    return moveRequestBuilder;
+  }
+
+  /**
+   * Validates basic argument constraints like non-null, non-empty Strings, 
using {@code
+   * Preconditions} in addition to checking for src/dst bucket equality.
+   */
+  public static void validateMoveArguments(
+      Map<StorageResourceId, StorageResourceId> sourceToDestinationObjectsMap) 
throws IOException {
+    checkNotNull(sourceToDestinationObjectsMap, "srcObjects must not be null");
+
+    if (sourceToDestinationObjectsMap.isEmpty()) {
+      return;
+    }
+
+    for (Map.Entry<StorageResourceId, StorageResourceId> entry :
+        sourceToDestinationObjectsMap.entrySet()) {
+      StorageResourceId source = entry.getKey();
+      StorageResourceId destination = entry.getValue();
+      String srcBucketName = source.getBucketName();
+      String dstBucketName = destination.getBucketName();
+      // Avoid move across buckets.
+      if (!srcBucketName.equals(dstBucketName)) {
+        throw new UnsupportedOperationException(
+            "This operation is not supported across two different buckets.");
+      }
+      checkArgument(
+          !isNullOrEmpty(source.getObjectName()), "srcObjectName must not be 
null or empty");
+      checkArgument(
+          !isNullOrEmpty(destination.getObjectName()), "dstObjectName must not 
be null or empty");
+      if (srcBucketName.equals(dstBucketName)
+          && source.getObjectName().equals(destination.getObjectName())) {
+        throw new IllegalArgumentException(
+            String.format(
+                "Move destination must be different from source for %s.",
+                StringPaths.fromComponents(srcBucketName, 
source.getObjectName())));
+      }
+    }
+  }
+
+  void copy(Map<StorageResourceId, StorageResourceId> 
sourceToDestinationObjectsMap)
+      throws IOException {
+    validateCopyArguments(sourceToDestinationObjectsMap, this);
+
+    if (sourceToDestinationObjectsMap.isEmpty()) {
+      return;
+    }
+
+    for (Map.Entry<StorageResourceId, StorageResourceId> entry :
+        sourceToDestinationObjectsMap.entrySet()) {
+      StorageResourceId srcObject = entry.getKey();
+      StorageResourceId dstObject = entry.getValue();
+      // TODO: Do this concurrently
+      copyInternal(
+          srcObject.getBucketName(),
+          srcObject.getObjectName(),
+          dstObject.getGenerationId(),
+          dstObject.getBucketName(),
+          dstObject.getObjectName());
+    }
+  }
+
+  private void copyInternal(
+      String srcBucketName,
+      String srcObjectName,
+      long dstContentGeneration,
+      String dstBucketName,
+      String dstObjectName) throws IOException {
+    Storage.CopyRequest.Builder copyRequestBuilder =
+        Storage.CopyRequest.newBuilder().setSource(BlobId.of(srcBucketName, 
srcObjectName));
+    if (dstContentGeneration != StorageResourceId.UNKNOWN_GENERATION_ID) {
+      copyRequestBuilder.setTarget(
+          BlobId.of(dstBucketName, dstObjectName),
+          Storage.BlobTargetOption.generationMatch(dstContentGeneration));
+    } else {
+      copyRequestBuilder.setTarget(BlobId.of(dstBucketName, dstObjectName));
+    }
+
+    // TODO: Add support for encryption key
+    if (configuration.getMaxRewriteChunkSize() > 0) {
+      copyRequestBuilder.setMegabytesCopiedPerChunk(
+          // Convert raw byte size into Mib.
+          configuration.getMaxRewriteChunkSize() / (1024 * 1024));
+    }
+
+    String srcString = StringPaths.fromComponents(srcBucketName, 
srcObjectName);
+    String dstString = StringPaths.fromComponents(dstBucketName, 
dstObjectName);
+
+    try {
+      CopyWriter copyWriter = storage.copy(copyRequestBuilder.build());
+      while (!copyWriter.isDone()) {
+        copyWriter.copyChunk();
+        LOG.trace(
+            "Copy ({} to {}) did not complete. Resuming...", srcString, 
dstString);
+      }
+      LOG.trace("Successfully copied {} to {}", srcString, dstString);
+    } catch (StorageException e) {
+      if (ErrorTypeExtractor.getErrorType(e) == 
ErrorTypeExtractor.ErrorType.NOT_FOUND) {
+        throw createFileNotFoundException(srcBucketName, srcObjectName, new 
IOException(e));
+      } else {
+        throw new IOException(String.format("copy(%s->%s) failed.", srcString, 
dstString), e);
+      }
+    }
+  }
+
+  public static void validateCopyArguments(
+      Map<StorageResourceId, StorageResourceId> sourceToDestinationObjectsMap,
+      GoogleCloudStorage gcsImpl)
+      throws IOException {
+    checkNotNull(sourceToDestinationObjectsMap, "srcObjects must not be null");
+
+    if (sourceToDestinationObjectsMap.isEmpty()) {
+      return;
+    }
+
+    Map<StorageResourceId, GoogleCloudStorageItemInfo> bucketInfoCache = new 
HashMap<>();
+
+    for (Map.Entry<StorageResourceId, StorageResourceId> entry :
+        sourceToDestinationObjectsMap.entrySet()) {
+      StorageResourceId source = entry.getKey();
+      StorageResourceId destination = entry.getValue();
+      String srcBucketName = source.getBucketName();
+      String dstBucketName = destination.getBucketName();
+      // Avoid copy across locations or storage classes.
+      if (!srcBucketName.equals(dstBucketName)) {
+        StorageResourceId srcBucketResourceId = new 
StorageResourceId(srcBucketName);
+        GoogleCloudStorageItemInfo srcBucketInfo =
+            getGoogleCloudStorageItemInfo(gcsImpl, bucketInfoCache, 
srcBucketResourceId);
+        if (!srcBucketInfo.exists()) {
+          throw new FileNotFoundException("Bucket not found: " + 
srcBucketName);
+        }
+
+        StorageResourceId dstBucketResourceId = new 
StorageResourceId(dstBucketName);
+        GoogleCloudStorageItemInfo dstBucketInfo =
+            getGoogleCloudStorageItemInfo(gcsImpl, bucketInfoCache, 
dstBucketResourceId);
+        if (!dstBucketInfo.exists()) {
+          throw new FileNotFoundException("Bucket not found: " + 
dstBucketName);
+        }
+
+        // TODO: Restrict this only when copy-with-rewrite is enabled
+        if (!srcBucketInfo.getLocation().equals(dstBucketInfo.getLocation())) {
+          throw new UnsupportedOperationException(
+              "This operation is not supported across two different storage 
locations.");
+        }
+
+        if 
(!srcBucketInfo.getStorageClass().equals(dstBucketInfo.getStorageClass())) {
+          throw new UnsupportedOperationException(
+              "This operation is not supported across two different storage 
classes.");
+        }
+      }
+      checkArgument(
+          !isNullOrEmpty(source.getObjectName()), "srcObjectName must not be 
null or empty");
+      checkArgument(
+          !isNullOrEmpty(destination.getObjectName()), "dstObjectName must not 
be null or empty");
+      if (srcBucketName.equals(dstBucketName)
+          && source.getObjectName().equals(destination.getObjectName())) {
+        throw new IllegalArgumentException(
+            String.format(
+                "Copy destination must be different from source for %s.",
+                StringPaths.fromComponents(srcBucketName, 
source.getObjectName())));
+      }
+    }
+  }
+
+  private static GoogleCloudStorageItemInfo getGoogleCloudStorageItemInfo(
+      GoogleCloudStorage gcsImpl,
+      Map<StorageResourceId, GoogleCloudStorageItemInfo> bucketInfoCache,
+      StorageResourceId resourceId)
+      throws IOException {
+    GoogleCloudStorageItemInfo storageItemInfo = 
bucketInfoCache.get(resourceId);
+    if (storageItemInfo != null) {
+      return storageItemInfo;
+    }
+    storageItemInfo = gcsImpl.getItemInfo(resourceId);
+    bucketInfoCache.put(resourceId, storageItemInfo);
+    return storageItemInfo;
+  }
+
   // Helper class to capture the results of list operation.
   private class ListOperationResult {
     private final Map<String, Blob> prefixes = new HashMap<>();
diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageClientReadChannel.java
 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageClientReadChannel.java
new file mode 100644
index 00000000000..afe16c66701
--- /dev/null
+++ 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageClientReadChannel.java
@@ -0,0 +1,609 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.gs;
+
+import static 
org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkNotNull;
+import static 
org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkState;
+import static 
org.apache.hadoop.thirdparty.com.google.common.base.Strings.nullToEmpty;
+import static java.lang.Math.max;
+import static java.lang.Math.min;
+import static java.lang.Math.toIntExact;
+import static 
org.apache.hadoop.fs.gs.GoogleCloudStorageExceptions.createFileNotFoundException;
+
+import com.google.cloud.ReadChannel;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.Storage.BlobSourceOption;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.EOFException;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SeekableByteChannel;
+import java.util.ArrayList;
+import java.util.List;
+import javax.annotation.Nullable;
+
+/** Provides seekable read access to GCS via java-storage library. */
+class GoogleCloudStorageClientReadChannel implements SeekableByteChannel {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(GoogleCloudStorageClientReadChannel.class);
+  private static final String GZIP_ENCODING = "gzip";
+
+  private final StorageResourceId resourceId;
+  private final Storage storage;
+  private final GoogleHadoopFileSystemConfiguration config;
+
+  // The size of this object generation, in bytes.
+  private long objectSize;
+  private ContentReadChannel contentReadChannel;
+  private boolean gzipEncoded = false;
+  private boolean open = true;
+
+  // Current position in this channel, it could be different from 
contentChannelCurrentPosition if
+  // position(long) method calls were made without calls to read(ByteBuffer) 
method.
+  private long currentPosition = 0;
+
+  GoogleCloudStorageClientReadChannel(
+      Storage storage,
+      GoogleCloudStorageItemInfo itemInfo,
+      GoogleHadoopFileSystemConfiguration config)
+      throws IOException {
+    validate(itemInfo);
+    this.storage = storage;
+    this.resourceId =
+        new StorageResourceId(
+            itemInfo.getBucketName(), itemInfo.getObjectName(), 
itemInfo.getContentGeneration());
+    this.contentReadChannel = new ContentReadChannel(config, resourceId);
+    initMetadata(itemInfo.getContentEncoding(), itemInfo.getSize());
+    this.config = config;
+  }
+
+  protected void initMetadata(@Nullable String encoding, long 
sizeFromMetadata) throws IOException {
+    gzipEncoded = nullToEmpty(encoding).contains(GZIP_ENCODING);
+    if (gzipEncoded && !config.isGzipEncodingSupportEnabled()) {
+      throw new IOException(
+          "Cannot read GZIP encoded files - content encoding support is 
disabled.");
+    }
+    objectSize = gzipEncoded ? Long.MAX_VALUE : sizeFromMetadata;
+  }
+
+  @Override
+  public int read(ByteBuffer dst) throws IOException {
+    throwIfNotOpen();
+
+    // Don't try to read if the buffer has no space.
+    if (dst.remaining() == 0) {
+      return 0;
+    }
+    LOG.trace(
+        "Reading {} bytes at {} position from '{}'", dst.remaining(), 
currentPosition, resourceId);
+    if (currentPosition == objectSize) {
+      return -1;
+    }
+    return contentReadChannel.readContent(dst);
+  }
+
+  @Override
+  public int write(ByteBuffer src) throws IOException {
+    throw new UnsupportedOperationException("Cannot mutate read-only channel");
+  }
+
+  @Override
+  public long position() throws IOException {
+    return currentPosition;
+  }
+
+  /**
+   * Sets this channel's position.
+   *
+   * <p>This method will throw an exception if {@code newPosition} is greater 
than object size,
+   * which contradicts {@link SeekableByteChannel#position(long) 
SeekableByteChannel} contract.
+   * TODO(user): decide if this needs to be fixed.
+   *
+   * @param newPosition the new position, counting the number of bytes from 
the beginning.
+   * @return this channel instance
+   * @throws FileNotFoundException if the underlying object does not exist.
+   * @throws IOException on IO error
+   */
+  @Override
+  public SeekableByteChannel position(long newPosition) throws IOException {
+    throwIfNotOpen();
+
+    if (newPosition == currentPosition) {
+      return this;
+    }
+
+    validatePosition(newPosition);
+    LOG.trace(
+        "Seek from {} to {} position for '{}'", currentPosition, newPosition, 
resourceId);
+    currentPosition = newPosition;
+    return this;
+  }
+
+  @Override
+  public long size() throws IOException {
+    return objectSize;
+  }
+
+  @Override
+  public SeekableByteChannel truncate(long size) throws IOException {
+    throw new UnsupportedOperationException("Cannot mutate read-only channel");
+  }
+
+  @Override
+  public boolean isOpen() {
+    return open;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (open) {
+      try {
+        LOG.trace("Closing channel for '{}'", resourceId);
+        contentReadChannel.closeContentChannel();
+      } catch (Exception e) {
+        throw new IOException(
+            String.format("Exception occurred while closing channel '%s'", 
resourceId), e);
+      } finally {
+        contentReadChannel = null;
+        open = false;
+      }
+    }
+  }
+
+  /**
+   * This class own the responsibility of opening up contentChannel. It also 
implements the Fadvise,
+   * which helps in deciding the boundaries of content channel being opened 
and also caching the
+   * footer of an object.
+   */
+  private class ContentReadChannel {
+
+    // Size of buffer to allocate for skipping bytes in-place when performing 
in-place seeks.
+    private static final int SKIP_BUFFER_SIZE = 8192;
+    private final BlobId blobId;
+
+    // This is the actual current position in `contentChannel` from where read 
can happen.
+    // This remains unchanged of position(long) method call.
+    private long contentChannelCurrentPosition = -1;
+    private long contentChannelEnd = -1;
+    // Prefetched footer content.
+    private byte[] footerContent;
+    // Used as scratch space when reading bytes just to discard them when 
trying to perform small
+    // in-place seeks.
+    private byte[] skipBuffer = null;
+    private ReadableByteChannel byteChannel = null;
+    private final FileAccessPatternManager fileAccessManager;
+
+    ContentReadChannel(
+        GoogleHadoopFileSystemConfiguration config, StorageResourceId 
resourceId) {
+      this.blobId =
+          BlobId.of(
+              resourceId.getBucketName(), resourceId.getObjectName(), 
resourceId.getGenerationId());
+      this.fileAccessManager = new FileAccessPatternManager(resourceId, 
config);
+      if (gzipEncoded) {
+        fileAccessManager.overrideAccessPattern(false);
+      }
+    }
+
+    int readContent(ByteBuffer dst) throws IOException {
+
+      performPendingSeeks();
+
+      checkState(
+          contentChannelCurrentPosition == currentPosition || byteChannel == 
null,
+          "contentChannelCurrentPosition (%s) should be equal to 
currentPosition "
+              + "(%s) after lazy seek, if channel is open",
+          contentChannelCurrentPosition,
+          currentPosition);
+
+      int totalBytesRead = 0;
+      // We read from a streaming source. We may not get all the bytes we 
asked for
+      // in the first read. Therefore, loop till we either read the required 
number of
+      // bytes or we reach end-of-stream.
+      while (dst.hasRemaining()) {
+        int remainingBeforeRead = dst.remaining();
+        try {
+          if (byteChannel == null) {
+            byteChannel = openByteChannel(dst.remaining());
+            // We adjust the start index of content channel in following cases
+            // 1. request range is in footer boundaries --> request the whole 
footer
+            // 2. requested content is gzip encoded -> request always from 
start of file.
+            // Case(1) is handled with reading and caching the extra read 
bytes, for all other cases
+            // we need to skip all the unrequested bytes before start reading 
from current position.
+            if (currentPosition > contentChannelCurrentPosition) {
+              skipInPlace();
+            }
+            // making sure that currentPosition is in alignment with 
currentReadPosition before
+            // actual read starts to avoid read discrepancies.
+            checkState(
+                contentChannelCurrentPosition == currentPosition,
+                "position of read offset isn't in alignment with channel's 
read offset");
+          }
+          int bytesRead = byteChannel.read(dst);
+
+          /*
+          As we are using the zero copy implementation of byteChannel,
+          it can return even zero bytes,
+          while reading,
+          we should not treat it as an error scenario anymore.
+          */
+          if (bytesRead == 0) {
+            LOG.trace(
+                "Read {} from storage-client's byte channel at position: {} 
with channel "
+                    + "ending at: {} for resourceId: {} of size: {}",
+                bytesRead, currentPosition, contentChannelEnd, resourceId, 
objectSize);
+          }
+
+          if (bytesRead < 0) {
+            // Because we don't know decompressed object size for gzip-encoded 
objects,
+            // assume that this is an object end.
+            if (gzipEncoded) {
+              objectSize = currentPosition;
+              contentChannelEnd = currentPosition;
+            }
+
+            if (currentPosition != contentChannelEnd && currentPosition != 
objectSize) {
+              throw new IOException(
+                  String.format(
+                      "Received end of stream result before all requestedBytes 
were received;"
+                          + "EndOf stream signal received at offset: %d where 
as stream was "
+                          + "suppose to end at: %d for resource: %s of size: 
%d",
+                      currentPosition, contentChannelEnd, resourceId, 
objectSize));
+            }
+            // If we have reached an end of a contentChannel but not an end of 
an object.
+            // then close contentChannel and continue reading an object if 
necessary.
+            if (contentChannelEnd != objectSize && currentPosition == 
contentChannelEnd) {
+              closeContentChannel();
+              continue;
+            } else {
+              break;
+            }
+          }
+          totalBytesRead += bytesRead;
+          currentPosition += bytesRead;
+          contentChannelCurrentPosition += bytesRead;
+          checkState(
+              contentChannelCurrentPosition == currentPosition,
+              "contentChannelPosition (%s) should be equal to currentPosition 
(%s)"
+                  + " after successful read",
+              contentChannelCurrentPosition,
+              currentPosition);
+        } catch (Exception e) {
+          int partialBytes = partiallyReadBytes(remainingBeforeRead, dst);
+          currentPosition += partialBytes;
+          contentChannelCurrentPosition += partialBytes;
+          LOG.trace(
+              "Closing contentChannel after {} exception for '{}'.", 
e.getMessage(), resourceId);
+          closeContentChannel();
+          throw convertError(e);
+        }
+      }
+      return totalBytesRead;
+    }
+
+    private int partiallyReadBytes(int remainingBeforeRead, ByteBuffer dst) {
+      int partialReadBytes = 0;
+      if (remainingBeforeRead != dst.remaining()) {
+        partialReadBytes = remainingBeforeRead - dst.remaining();
+      }
+      return partialReadBytes;
+    }
+
+    private ReadableByteChannel openByteChannel(long bytesToRead) throws 
IOException {
+      checkArgument(
+          bytesToRead > 0, "bytesToRead should be greater than 0, but was %s", 
bytesToRead);
+      checkState(
+          byteChannel == null && contentChannelEnd < 0,
+          "contentChannel and contentChannelEnd should be not initialized yet 
for '%s'",
+          resourceId);
+
+      if (footerContent != null && currentPosition >= objectSize - 
footerContent.length) {
+        return serveFooterContent();
+      }
+
+      // Should be updated only if content is not served from cached footer
+      fileAccessManager.updateAccessPattern(currentPosition);
+
+      setChannelBoundaries(bytesToRead);
+
+      ReadableByteChannel readableByteChannel =
+          getStorageReadChannel(contentChannelCurrentPosition, 
contentChannelEnd);
+
+      if (contentChannelEnd == objectSize
+          && (contentChannelEnd - contentChannelCurrentPosition)
+              <= config.getMinRangeRequestSize()) {
+
+        if (footerContent == null) {
+          cacheFooter(readableByteChannel);
+        }
+        return serveFooterContent();
+      }
+      return readableByteChannel;
+    }
+
+    private void setChannelBoundaries(long bytesToRead) {
+      contentChannelCurrentPosition = getRangeRequestStart();
+      contentChannelEnd = getRangeRequestEnd(contentChannelCurrentPosition, 
bytesToRead);
+      checkState(
+          contentChannelEnd >= contentChannelCurrentPosition,
+          String.format(
+              "Start position should be <= endPosition startPosition:%d, 
endPosition: %d",
+              contentChannelCurrentPosition, contentChannelEnd));
+    }
+
+    private void cacheFooter(ReadableByteChannel readableByteChannel) throws 
IOException {
+      int footerSize = toIntExact(objectSize - contentChannelCurrentPosition);
+      footerContent = new byte[footerSize];
+      try (InputStream footerStream = 
Channels.newInputStream(readableByteChannel)) {
+        int totalBytesRead = 0;
+        int bytesRead;
+        do {
+          bytesRead = footerStream.read(footerContent, totalBytesRead, 
footerSize - totalBytesRead);
+          if (bytesRead >= 0) {
+            totalBytesRead += bytesRead;
+          }
+        } while (bytesRead >= 0 && totalBytesRead < footerSize);
+        checkState(
+            bytesRead >= 0,
+            "footerStream shouldn't be empty before reading the footer of size 
%s, "
+                + "totalBytesRead %s, read via last call %s, for '%s'",
+            footerSize,
+            totalBytesRead,
+            bytesRead,
+            resourceId);
+        checkState(
+            totalBytesRead == footerSize,
+            "totalBytesRead (%s) should equal footerSize (%s) for '%s'",
+            totalBytesRead,
+            footerSize,
+            resourceId);
+      } catch (Exception e) {
+        footerContent = null;
+        throw e;
+      }
+      LOG.trace("Prefetched {} bytes footer for '{}'", footerContent.length, 
resourceId);
+    }
+
+    private ReadableByteChannel serveFooterContent() {
+      contentChannelCurrentPosition = currentPosition;
+      int offset = toIntExact(currentPosition - (objectSize - 
footerContent.length));
+      int length = footerContent.length - offset;
+      LOG.trace(
+          "Opened channel (prefetched footer) from {} position for '{}'",
+          currentPosition, resourceId);
+      return Channels.newChannel(new ByteArrayInputStream(footerContent, 
offset, length));
+    }
+
+    private long getRangeRequestStart() {
+      if (gzipEncoded) {
+        return 0;
+      }
+      if (config.getFadvise() != Fadvise.SEQUENTIAL
+          && isFooterRead()
+          && !config.isReadExactRequestedBytesEnabled()) {
+        // Prefetch footer and adjust start position to footerStart.
+        return max(0, objectSize - config.getMinRangeRequestSize());
+      }
+      return currentPosition;
+    }
+
+    private long getRangeRequestEnd(long startPosition, long bytesToRead) {
+      // Always read gzip-encoded files till the end - they do not support 
range reads.
+      if (gzipEncoded) {
+        return objectSize;
+      }
+      long endPosition = objectSize;
+      if (fileAccessManager.shouldAdaptToRandomAccess()) {
+        // opening a channel for whole object doesn't make sense as anyhow it 
will not be utilized
+        // for further reads.
+        endPosition = startPosition + max(bytesToRead, 
config.getMinRangeRequestSize());
+      } else {
+        if (config.getFadvise() == Fadvise.AUTO_RANDOM) {
+          endPosition = min(startPosition + config.getBlockSize(), objectSize);
+        }
+      }
+
+      if (footerContent != null) {
+        // If footer is cached open just till footerStart.
+        // Remaining content ill be served from cached footer itself.
+        endPosition = min(endPosition, objectSize - footerContent.length);
+      }
+      return endPosition;
+    }
+
+    void closeContentChannel() {
+      if (byteChannel != null) {
+        LOG.trace("Closing internal contentChannel for '{}'", resourceId);
+        try {
+          byteChannel.close();
+        } catch (Exception e) {
+          LOG.trace(
+              "Got an exception on contentChannel.close() for '{}'; ignoring 
it.", resourceId, e);
+        } finally {
+          byteChannel = null;
+          
fileAccessManager.updateLastServedIndex(contentChannelCurrentPosition);
+          reset();
+        }
+      }
+    }
+
+    private void reset() {
+      checkState(byteChannel == null, "contentChannel should be null for 
'%s'", resourceId);
+      contentChannelCurrentPosition = -1;
+      contentChannelEnd = -1;
+    }
+
+    private boolean isInRangeSeek() {
+      long seekDistance = currentPosition - contentChannelCurrentPosition;
+      if (byteChannel != null
+          && seekDistance > 0
+          // for gzip encoded content always seek in place
+          && (gzipEncoded || seekDistance <= config.getInplaceSeekLimit())
+          && currentPosition < contentChannelEnd) {
+        return true;
+      }
+      return false;
+    }
+
+    private void skipInPlace() {
+      if (skipBuffer == null) {
+        skipBuffer = new byte[SKIP_BUFFER_SIZE];
+      }
+      long seekDistance = currentPosition - contentChannelCurrentPosition;
+      while (seekDistance > 0 && byteChannel != null) {
+        try {
+          int bufferSize = toIntExact(min(skipBuffer.length, seekDistance));
+          int bytesRead = byteChannel.read(ByteBuffer.wrap(skipBuffer, 0, 
bufferSize));
+          if (bytesRead < 0) {
+            LOG.info(
+                "Somehow read {} bytes trying to skip {} bytes to seek to 
position {}, size: {}",
+                bytesRead, seekDistance, currentPosition, objectSize);
+            closeContentChannel();
+          } else {
+            seekDistance -= bytesRead;
+            contentChannelCurrentPosition += bytesRead;
+          }
+        } catch (Exception e) {
+          LOG.info(
+              "Got an IO exception on contentChannel.read(), a lazy-seek will 
be pending for '{}'",
+              resourceId, e);
+          closeContentChannel();
+        }
+      }
+      checkState(
+          byteChannel == null || contentChannelCurrentPosition == 
currentPosition,
+          "contentChannelPosition (%s) should be equal to currentPosition (%s)"
+              + " after successful in-place skip",
+          contentChannelCurrentPosition,
+          currentPosition);
+    }
+
+    private void performPendingSeeks() {
+
+      // Return quickly if there is no pending seek operation, i.e. position 
didn't change.
+      if (currentPosition == contentChannelCurrentPosition && byteChannel != 
null) {
+        return;
+      }
+
+      LOG.trace(
+          "Performing lazySeek from {} to {} position '{}'",
+          contentChannelCurrentPosition, currentPosition, resourceId);
+
+      if (isInRangeSeek()) {
+        skipInPlace();
+      } else {
+        // close existing contentChannel as requested bytes can't be served 
from current
+        // contentChannel;
+        closeContentChannel();
+      }
+    }
+
+    private ReadableByteChannel getStorageReadChannel(long seek, long limit) 
throws IOException {
+      ReadChannel readChannel = storage.reader(blobId, generateReadOptions());
+      try {
+        readChannel.seek(seek);
+        readChannel.limit(limit);
+        // bypass the storage-client caching layer hence eliminates the need 
to maintain a copy of
+        // chunk
+        readChannel.setChunkSize(0);
+        return readChannel;
+      } catch (Exception e) {
+        throw new IOException(
+            String.format(
+                "Unable to update the boundaries/Range of contentChannel %s",
+                resourceId.toString()),
+            e);
+      }
+    }
+
+    private BlobSourceOption[] generateReadOptions() {
+      List<BlobSourceOption> blobReadOptions = new ArrayList<>();
+      // To get decoded content
+      blobReadOptions.add(BlobSourceOption.shouldReturnRawInputStream(false));
+
+      if (blobId.getGeneration() != null) {
+        
blobReadOptions.add(BlobSourceOption.generationMatch(blobId.getGeneration()));
+      }
+
+      // TODO: Add support for encryptionKey
+      return blobReadOptions.toArray(new 
BlobSourceOption[blobReadOptions.size()]);
+    }
+
+    private boolean isFooterRead() {
+      return objectSize - currentPosition <= config.getMinRangeRequestSize();
+    }
+  }
+
+  private static void validate(GoogleCloudStorageItemInfo itemInfo) throws 
IOException {
+    checkNotNull(itemInfo, "itemInfo cannot be null");
+    StorageResourceId resourceId = itemInfo.getResourceId();
+    checkArgument(
+        resourceId.isStorageObject(), "Can not open a non-file object for 
read: %s", resourceId);
+    if (!itemInfo.exists()) {
+      throw new FileNotFoundException(String.format("Item not found: %s", 
resourceId));
+    }
+  }
+
+  private IOException convertError(Exception error) {
+    String msg = String.format("Error reading '%s'", resourceId);
+    switch (ErrorTypeExtractor.getErrorType(error)) {
+    case NOT_FOUND:
+      return createFileNotFoundException(
+          resourceId.getBucketName(), resourceId.getObjectName(), new 
IOException(msg, error));
+    case OUT_OF_RANGE:
+      return (IOException) new EOFException(msg).initCause(error);
+    default:
+      return new IOException(msg, error);
+    }
+  }
+
+  /** Validates that the given position is valid for this channel. */
+  private void validatePosition(long position) throws IOException {
+    if (position < 0) {
+      throw new EOFException(
+          String.format(
+              "Invalid seek offset: position value (%d) must be >= 0 for '%s'",
+              position, resourceId));
+    }
+
+    if (objectSize >= 0 && position >= objectSize) {
+      throw new EOFException(
+          String.format(
+              "Invalid seek offset: position value (%d) must be between 0 and 
%d for '%s'",
+              position, objectSize, resourceId));
+    }
+  }
+
+  /** Throws if this channel is not currently open. */
+  private void throwIfNotOpen() throws IOException {
+    if (!isOpen()) {
+      throw new ClosedChannelException();
+    }
+  }
+}
diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageExceptions.java
 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageExceptions.java
new file mode 100644
index 00000000000..95f0e41617c
--- /dev/null
+++ 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageExceptions.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.gs;
+
+import static 
org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.hadoop.thirdparty.com.google.common.base.Strings.isNullOrEmpty;
+import static 
org.apache.hadoop.thirdparty.com.google.common.base.Strings.nullToEmpty;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import javax.annotation.Nullable;
+
+/**
+ * Miscellaneous helper methods for standardizing the types of exceptions 
thrown by the various
+ * GCS-based FileSystems.
+ */
+final class GoogleCloudStorageExceptions {
+
+  private GoogleCloudStorageExceptions() {}
+
+  /** Creates FileNotFoundException with suitable message for a GCS bucket or 
object. */
+  static FileNotFoundException createFileNotFoundException(
+      String bucketName, String objectName, @Nullable IOException cause) {
+    checkArgument(!isNullOrEmpty(bucketName), "bucketName must not be null or 
empty");
+    FileNotFoundException fileNotFoundException =
+        new FileNotFoundException(
+            String.format(
+                "Item not found: '%s'. Note, it is possible that the live 
version"
+                    + " is still available but the requested generation is 
deleted.",
+                StringPaths.fromComponents(bucketName, 
nullToEmpty(objectName))));
+    if (cause != null) {
+      fileNotFoundException.initCause(cause);
+    }
+    return fileNotFoundException;
+  }
+
+  static FileNotFoundException createFileNotFoundException(
+      StorageResourceId resourceId, @Nullable IOException cause) {
+    return createFileNotFoundException(
+        resourceId.getBucketName(), resourceId.getObjectName(), cause);
+  }
+}
diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageFileSystem.java
 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageFileSystem.java
index aa1617e4da6..2b0c238eb02 100644
--- 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageFileSystem.java
+++ 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageFileSystem.java
@@ -26,6 +26,7 @@
 import com.google.auth.Credentials;
 import 
org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
+import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
 import org.apache.hadoop.thirdparty.com.google.common.collect.Iterables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -33,13 +34,20 @@
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
+import java.nio.channels.SeekableByteChannel;
 import java.nio.channels.WritableByteChannel;
 import java.nio.file.DirectoryNotEmptyException;
 import java.nio.file.FileAlreadyExistsException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.TreeMap;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
 
 /**
  * Provides FS semantics over GCS based on Objects API.
@@ -73,6 +81,7 @@ class GoogleCloudStorageFileSystem {
 
   // URI of the root path.
   static final URI GCSROOT = URI.create(SCHEME + ":/");
+  private final GoogleHadoopFileSystemConfiguration configuration;
 
   // GCS access instance.
   private GoogleCloudStorage gcs;
@@ -87,6 +96,7 @@ private static GoogleCloudStorage createCloudStorage(
 
   GoogleCloudStorageFileSystem(final GoogleHadoopFileSystemConfiguration 
configuration,
       final Credentials credentials) throws IOException {
+    this.configuration = configuration;
     gcs = createCloudStorage(configuration, credentials);
   }
 
@@ -157,9 +167,7 @@ private GoogleCloudStorageItemInfo getFileInfoInternal(
             resourceId.getBucketName(),
             resourceId.getObjectName(),
             GET_FILE_INFO_LIST_OPTIONS);
-        LOG.trace("List for getMetadata returned {}. {}", 
listDirResult.size(), listDirResult);
         if (!listDirResult.isEmpty()) {
-          LOG.trace("Get metadata for directory returned non empty {}", 
listDirResult);
           return 
GoogleCloudStorageItemInfo.createInferredDirectory(resourceId.toDirectoryId());
         }
       }
@@ -371,4 +379,314 @@ public List<FileInfo> listFileInfo(URI path, 
ListFileOptions listOptions) throws
     fileInfos.sort(FILE_INFO_PATH_COMPARATOR);
     return fileInfos;
   }
+
+  FileInfo getFileInfoObject(URI path) throws IOException {
+    checkArgument(path != null, "path must not be null");
+    StorageResourceId resourceId = StorageResourceId.fromUriPath(path, true);
+    checkArgument(
+        !resourceId.isDirectory(),
+        String.format(
+            "path must be an object and not a directory, path: %s, resourceId: 
%s",
+            path, resourceId));
+    FileInfo fileInfo = FileInfo.fromItemInfo(gcs.getItemInfo(resourceId));
+    LOG.trace("getFileInfoObject(path: {}): {}", path, fileInfo);
+    return fileInfo;
+  }
+
+  SeekableByteChannel open(FileInfo fileInfo, 
GoogleHadoopFileSystemConfiguration config)
+      throws IOException {
+    checkNotNull(fileInfo, "fileInfo should not be null");
+    checkArgument(
+        !fileInfo.isDirectory(), "Cannot open a directory for reading: %s", 
fileInfo.getPath());
+
+    return gcs.open(fileInfo.getItemInfo(), config);
+  }
+
+  void rename(URI src, URI dst) throws IOException {
+    LOG.trace("rename(src: {}, dst: {})", src, dst);
+    checkNotNull(src);
+    checkNotNull(dst);
+    checkArgument(!src.equals(GCSROOT), "Root path cannot be renamed.");
+
+    // Parent of the destination path.
+    URI dstParent = UriPaths.getParentPath(dst);
+
+    // Obtain info on source, destination and destination-parent.
+    List<URI> paths = new ArrayList<>();
+    paths.add(src);
+    paths.add(dst);
+    if (dstParent != null) {
+      // dstParent is null if dst is GCS_ROOT.
+      paths.add(dstParent);
+    }
+    List<FileInfo> fileInfos = getFileInfos(paths);
+    FileInfo srcInfo = fileInfos.get(0);
+    FileInfo dstInfo = fileInfos.get(1);
+    FileInfo dstParentInfo = dstParent == null ? null : fileInfos.get(2);
+
+    // Throw if the source file does not exist.
+    if (!srcInfo.exists()) {
+      throw new FileNotFoundException("Item not found: " + src);
+    }
+
+    // Make sure paths match what getFileInfo() returned (it can add / at the 
end).
+    src = srcInfo.getPath();
+    dst = getDstUri(srcInfo, dstInfo, dstParentInfo);
+
+    // if src and dst are equal then do nothing
+    if (src.equals(dst)) {
+      return;
+    }
+
+    if (srcInfo.isDirectory()) {
+      renameDirectoryInternal(srcInfo, dst);
+    } else {
+      renameObject(src, dst, srcInfo);
+    }
+  }
+
+  private void renameObject(URI src, URI dst, FileInfo srcInfo) throws 
IOException {
+    StorageResourceId srcResourceId =
+        StorageResourceId.fromUriPath(src, /* allowEmptyObjectName= */ true);
+    StorageResourceId dstResourceId = StorageResourceId.fromUriPath(
+        dst,
+        /* allowEmptyObjectName= */ true,
+        /* generationId= */ 0L);
+
+    if (srcResourceId.getBucketName().equals(dstResourceId.getBucketName())) {
+      gcs.move(
+          ImmutableMap.of(
+              new StorageResourceId(
+                  srcInfo.getItemInfo().getBucketName(),
+                  srcInfo.getItemInfo().getObjectName(),
+                  srcInfo.getItemInfo().getContentGeneration()),
+              dstResourceId));
+    } else {
+      gcs.copy(ImmutableMap.of(srcResourceId, dstResourceId));
+
+      gcs.deleteObjects(
+          ImmutableList.of(
+              new StorageResourceId(
+                  srcInfo.getItemInfo().getBucketName(),
+                  srcInfo.getItemInfo().getObjectName(),
+                  srcInfo.getItemInfo().getContentGeneration())));
+    }
+  }
+
+  /**
+   * Renames given directory without checking any parameters.
+   *
+   * <p>GCS does not support atomic renames therefore rename is implemented as 
copying source
+   * metadata to destination and then deleting source metadata. Note that only 
the metadata is
+   * copied and not the content of any file.
+   */
+  private void renameDirectoryInternal(FileInfo srcInfo, URI dst) throws 
IOException {
+    checkArgument(srcInfo.isDirectory(), "'%s' should be a directory", 
srcInfo);
+    checkArgument(dst.toString().endsWith(PATH_DELIMITER), "'%s' should be a 
directory", dst);
+
+    URI src = srcInfo.getPath();
+
+    // Mapping from each src to its respective dst.
+    // Sort src items so that parent directories appear before their children.
+    // That allows us to copy parent directories before we copy their children.
+    Map<FileInfo, URI> srcToDstItemNames = new 
TreeMap<>(FILE_INFO_PATH_COMPARATOR);
+    Map<FileInfo, URI> srcToDstMarkerItemNames = new 
TreeMap<>(FILE_INFO_PATH_COMPARATOR);
+
+    // List of individual paths to rename;
+    // we will try to carry out the copies in this list's order.
+    List<FileInfo> srcItemInfos =
+        listFileInfoForPrefix(src, ListFileOptions.DELETE_RENAME_LIST_OPTIONS);
+
+    // Create a list of sub-items to copy.
+    Pattern markerFilePattern = configuration.getMarkerFilePattern();
+    String prefix = src.toString();
+    for (FileInfo srcItemInfo : srcItemInfos) {
+      String relativeItemName = 
srcItemInfo.getPath().toString().substring(prefix.length());
+      URI dstItemName = dst.resolve(relativeItemName);
+      if (markerFilePattern != null && 
markerFilePattern.matcher(relativeItemName).matches()) {
+        srcToDstMarkerItemNames.put(srcItemInfo, dstItemName);
+      } else {
+        srcToDstItemNames.put(srcItemInfo, dstItemName);
+      }
+    }
+
+    StorageResourceId srcResourceId =
+        StorageResourceId.fromUriPath(src, /* allowEmptyObjectName= */ true);
+    StorageResourceId dstResourceId =
+        StorageResourceId.fromUriPath(
+            dst, /* allowEmptyObjectName= */ true, /* generationId= */ 0L);
+    if (srcResourceId.getBucketName().equals(dstResourceId.getBucketName())) {
+      // First, move all items except marker items
+      moveInternal(srcToDstItemNames);
+      // Finally, move marker items (if any) to mark rename operation success
+      moveInternal(srcToDstMarkerItemNames);
+
+      if (srcInfo.getItemInfo().isBucket()) {
+        deleteBucket(Collections.singletonList(srcInfo));
+      } else {
+        // If src is a directory then srcItemInfos does not contain its own 
name,
+        // we delete item separately in the list.
+        deleteObjects(Collections.singletonList(srcInfo));
+      }
+      return;
+    }
+
+    // TODO: Add support for across bucket moves
+    throw new UnsupportedOperationException(String.format(
+        "Moving object from bucket '%s' to '%s' is not supported",
+        srcResourceId.getBucketName(),
+        dstResourceId.getBucketName()));
+  }
+
+  List<FileInfo> listFileInfoForPrefix(URI prefix, ListFileOptions listOptions)
+      throws IOException {
+    LOG.trace("listAllFileInfoForPrefix(prefix: {})", prefix);
+    StorageResourceId prefixId = getPrefixId(prefix);
+    List<GoogleCloudStorageItemInfo> itemInfos =
+        gcs.listObjectInfo(
+            prefixId.getBucketName(),
+            prefixId.getObjectName(),
+            updateListObjectOptions(ListObjectOptions.DEFAULT_FLAT_LIST, 
listOptions));
+    List<FileInfo> fileInfos = FileInfo.fromItemInfos(itemInfos);
+    fileInfos.sort(FILE_INFO_PATH_COMPARATOR);
+    return fileInfos;
+  }
+
+  /** Moves items in given map that maps source items to destination items. */
+  private void moveInternal(Map<FileInfo, URI> srcToDstItemNames) throws 
IOException {
+    if (srcToDstItemNames.isEmpty()) {
+      return;
+    }
+
+    Map<StorageResourceId, StorageResourceId> sourceToDestinationObjectsMap = 
new HashMap<>();
+
+    // Prepare list of items to move.
+    for (Map.Entry<FileInfo, URI> srcToDstItemName : 
srcToDstItemNames.entrySet()) {
+      StorageResourceId srcResourceId = 
srcToDstItemName.getKey().getItemInfo().getResourceId();
+
+      StorageResourceId dstResourceId =
+          StorageResourceId.fromUriPath(srcToDstItemName.getValue(), true);
+      sourceToDestinationObjectsMap.put(srcResourceId, dstResourceId);
+    }
+
+    // Perform move.
+    gcs.move(sourceToDestinationObjectsMap);
+  }
+
+  private static ListObjectOptions updateListObjectOptions(
+      ListObjectOptions listObjectOptions, ListFileOptions listFileOptions) {
+    return 
listObjectOptions.builder().setFields(listFileOptions.getFields()).build();
+  }
+
+  private List<FileInfo> getFileInfos(List<URI> paths) throws IOException {
+    List<FileInfo> result = new ArrayList<>(paths.size());
+    for (URI path : paths) {
+      // TODO: Do this concurrently
+      result.add(getFileInfo(path));
+    }
+
+    return result;
+  }
+
+  private URI getDstUri(FileInfo srcInfo, FileInfo dstInfo, @Nullable FileInfo 
dstParentInfo)
+      throws IOException {
+    URI src = srcInfo.getPath();
+    URI dst = dstInfo.getPath();
+
+    // Throw if src is a file and dst == GCS_ROOT
+    if (!srcInfo.isDirectory() && dst.equals(GCSROOT)) {
+      throw new IOException("A file cannot be created in root.");
+    }
+
+    // Throw if the destination is a file that already exists, and it's not a 
source file.
+    if (dstInfo.exists() && !dstInfo.isDirectory() && (srcInfo.isDirectory() 
|| !dst.equals(src))) {
+      throw new IOException("Cannot overwrite an existing file: " + dst);
+    }
+
+    // Rename operation cannot be completed if parent of destination does not 
exist.
+    if (dstParentInfo != null && !dstParentInfo.exists()) {
+      throw new IOException(
+          "Cannot rename because path does not exist: " + 
dstParentInfo.getPath());
+    }
+
+    // Leaf item of the source path.
+    String srcItemName = getItemName(src);
+
+    // Having taken care of the initial checks, apply the regular rules.
+    // After applying the rules, we will be left with 2 paths such that:
+    // -- either both are files or both are directories
+    // -- src exists and dst leaf does not exist
+    if (srcInfo.isDirectory()) {
+      // -- if src is a directory
+      //    -- dst is an existing file => disallowed
+      //    -- dst is a directory => rename the directory.
+
+      // The first case (dst is an existing file) is already checked earlier.
+      // If the destination path looks like a file, make it look like a
+      // directory path. This is because users often type 'mv foo bar'
+      // rather than 'mv foo bar/'.
+      if (!dstInfo.isDirectory()) {
+        dst = UriPaths.toDirectory(dst);
+      }
+
+      // Throw if renaming directory to self - this is forbidden
+      if (src.equals(dst)) {
+        throw new IOException("Rename dir to self is forbidden");
+      }
+
+      URI dstRelativeToSrc = src.relativize(dst);
+      // Throw if dst URI relative to src is not equal to dst,
+      // because this means that src is a parent directory of dst
+      // and src cannot be "renamed" to its subdirectory
+      if (!dstRelativeToSrc.equals(dst)) {
+        throw new IOException("Rename to subdir is forbidden");
+      }
+
+      if (dstInfo.exists()) {
+        dst =
+            dst.equals(GCSROOT)
+                ? UriPaths.fromStringPathComponents(
+                srcItemName, /* objectName= */ null, /* allowEmptyObjectName= 
*/ true)
+                : UriPaths.toDirectory(dst.resolve(srcItemName));
+      }
+    } else {
+      // -- src is a file
+      //    -- dst is a file => rename the file.
+      //    -- dst is a directory => similar to the previous case after
+      //                             appending src file-name to dst
+
+      if (dstInfo.isDirectory()) {
+        if (!dstInfo.exists()) {
+          throw new IOException("Cannot rename because path does not exist: " 
+ dstInfo.getPath());
+        } else {
+          dst = dst.resolve(srcItemName);
+        }
+      }
+    }
+
+    return dst;
+  }
+
+  @Nullable
+  static String getItemName(URI path) {
+    checkNotNull(path, "path can not be null");
+
+    // There is no leaf item for the root path.
+    if (path.equals(GCSROOT)) {
+      return null;
+    }
+
+    StorageResourceId resourceId = StorageResourceId.fromUriPath(path, true);
+
+    if (resourceId.isBucket()) {
+      return resourceId.getBucketName();
+    }
+
+    String objectName = resourceId.getObjectName();
+    int index =
+        StringPaths.isDirectoryPath(objectName)
+            ? objectName.lastIndexOf(PATH_DELIMITER, objectName.length() - 2)
+            : objectName.lastIndexOf(PATH_DELIMITER);
+    return index < 0 ? objectName : objectName.substring(index + 1);
+  }
 }
diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFSInputStream.java
 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFSInputStream.java
new file mode 100644
index 00000000000..26629fc79b2
--- /dev/null
+++ 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFSInputStream.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.gs;
+
+import static 
org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkNotNull;
+import static 
org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SeekableByteChannel;
+import javax.annotation.Nonnull;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FileSystem;
+
+final class GoogleHadoopFSInputStream extends FSInputStream {
+  public static final Logger LOG = 
LoggerFactory.getLogger(GoogleHadoopFSInputStream.class);
+
+  // Used for single-byte reads.
+  private final byte[] singleReadBuf = new byte[1];
+
+  // Path of the file to read.
+  private final URI gcsPath;
+  // File Info of gcsPath, will be pre-populated in some cases i.e. when Json 
client is used and
+  // failFast is disabled.
+
+  // All store IO access goes through this.
+  private final SeekableByteChannel channel;
+  // Number of bytes read through this channel.
+  private long totalBytesRead = 0;
+
+  /**
+   * Closed bit. Volatile so reads are non-blocking. Updates must be in a 
synchronized block to
+   * guarantee an atomic check and set
+   */
+  private volatile boolean closed;
+
+  // Statistics tracker provided by the parent GoogleHadoopFileSystem for 
recording stats
+  private final FileSystem.Statistics statistics;
+
+  static GoogleHadoopFSInputStream create(
+      GoogleHadoopFileSystem ghfs, URI gcsPath, FileSystem.Statistics 
statistics)
+      throws IOException {
+    LOG.trace("create(gcsPath: {})", gcsPath);
+    GoogleCloudStorageFileSystem gcsFs = ghfs.getGcsFs();
+    FileInfo fileInfo = gcsFs.getFileInfoObject(gcsPath);
+    SeekableByteChannel channel = gcsFs.open(fileInfo, 
ghfs.getFileSystemConfiguration());
+    return new GoogleHadoopFSInputStream(gcsPath, channel, statistics);
+  }
+
+  private GoogleHadoopFSInputStream(
+      URI gcsPath,
+      SeekableByteChannel channel,
+      FileSystem.Statistics statistics) {
+    LOG.trace("GoogleHadoopFSInputStream(gcsPath: %s)", gcsPath);
+    this.gcsPath = gcsPath;
+    this.channel = channel;
+    this.statistics = statistics;
+  }
+
+  @Override
+  public synchronized int read() throws IOException {
+    checkNotClosed();
+    int numRead = read(singleReadBuf, /* offset= */ 0, /* length= */ 1);
+    checkState(
+        numRead == -1 || numRead == 1,
+        "Read %s bytes using single-byte buffer for path %s ending in position 
%s",
+        numRead,
+        gcsPath,
+        channel.position());
+    return numRead > 0 ? singleReadBuf[0] & 0xff : numRead;
+  }
+
+  @Override
+  public synchronized int read(@Nonnull byte[] buf, int offset, int length) 
throws IOException {
+    checkNotClosed();
+    checkNotNull(buf, "buf must not be null");
+    if (offset < 0 || length < 0 || length > buf.length - offset) {
+      throw new IndexOutOfBoundsException();
+    }
+
+    // TODO(user): Wrap this in a while-loop if we ever introduce a 
non-blocking mode for
+    // the underlying channel.
+    int numRead = channel.read(ByteBuffer.wrap(buf, offset, length));
+    if (numRead > 0) {
+      // -1 means we actually read 0 bytes, but requested at least one byte.
+      totalBytesRead += numRead;
+      statistics.incrementBytesRead(numRead);
+      statistics.incrementReadOps(1);
+    }
+    return numRead;
+  }
+
+  @Override
+  public synchronized void seek(long pos) throws IOException {
+    checkNotClosed();
+    LOG.trace("seek({})", pos);
+    try {
+      channel.position(pos);
+    } catch (IllegalArgumentException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    if (!closed) {
+      closed = true;
+
+      LOG.trace("close(): {}", gcsPath);
+      try {
+        if (channel != null) {
+          LOG.trace(
+              "Closing '{}' file with {} total bytes read", gcsPath, 
totalBytesRead);
+          channel.close();
+        }
+      } catch (Exception e) {
+        LOG.warn("Error while closing underneath read channel resources for 
path: {}", gcsPath, e);
+      }
+    }
+  }
+
+  /**
+   * Gets the current position within the file being read.
+   *
+   * @return The current position within the file being read.
+   * @throws IOException if an IO error occurs.
+   */
+  @Override
+  public synchronized long getPos() throws IOException {
+    checkNotClosed();
+    long pos = channel.position();
+    LOG.trace("getPos(): {}", pos);
+    return pos;
+  }
+
+  /**
+   * Seeks a different copy of the data. Not supported.
+   *
+   * @return true if a new source is found, false otherwise.
+   */
+  @Override
+  public boolean seekToNewSource(long targetPos) {
+    LOG.trace("seekToNewSource({}): false", targetPos);
+    return false;
+  }
+
+  @Override
+  public int available() throws IOException {
+    if (!channel.isOpen()) {
+      throw new ClosedChannelException();
+    }
+    return super.available();
+  }
+
+  /**
+   * Verify that the input stream is open. Non-blocking; this gives the last 
state of the volatile
+   * {@link #closed} field.
+   *
+   * @throws IOException if the connection is closed.
+   */
+  private void checkNotClosed() throws IOException {
+    if (closed) {
+      throw new IOException(gcsPath + ": " + 
FSExceptionMessages.STREAM_IS_CLOSED);
+    }
+  }
+}
diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystem.java
 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystem.java
index 8831568a356..8bf2f057724 100644
--- 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystem.java
+++ 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystem.java
@@ -265,9 +265,10 @@ public String getScheme() {
   }
 
   @Override
-  public FSDataInputStream open(final Path path, final int bufferSize) throws 
IOException {
-    LOG.trace("open({})", path);
-    throw new UnsupportedOperationException(path.toString());
+  public FSDataInputStream open(final Path hadoopPath, final int bufferSize) 
throws IOException {
+    LOG.trace("open({})", hadoopPath);
+    URI gcsPath = getGcsPath(hadoopPath);
+    return new FSDataInputStream(GoogleHadoopFSInputStream.create(this, 
gcsPath, statistics));
   }
 
   @Override
@@ -327,9 +328,37 @@ public FSDataOutputStream append(final Path path, final 
int i, final Progressabl
   }
 
   @Override
-  public boolean rename(final Path path, final Path path1) throws IOException {
-    LOG.trace("rename({}, {})", path, path1);
-    throw new UnsupportedOperationException(path.toString());
+  public boolean rename(final Path src, final Path dst) throws IOException {
+    LOG.trace("rename({}, {})", src, dst);
+
+    checkArgument(src != null, "src must not be null");
+    checkArgument(dst != null, "dst must not be null");
+
+    // Even though the underlying GCSFS will also throw an IAE if src is root, 
since our filesystem
+    // root happens to equal the global root, we want to explicitly check it 
here since derived
+    // classes may not have filesystem roots equal to the global root.
+    if (this.makeQualified(src).equals(fsRoot)) {
+      LOG.trace("rename(src: {}, dst: {}): false [src is a root]", src, dst);
+      return false;
+    }
+
+    try {
+      checkOpen();
+
+      URI srcPath = getGcsPath(src);
+      URI dstPath = getGcsPath(dst);
+      getGcsFs().rename(srcPath, dstPath);
+
+      LOG.trace("rename(src: {}, dst: {}): true", src, dst);
+    } catch (IOException e) {
+      if (ApiErrorExtractor.INSTANCE.requestFailure(e)) {
+        throw e;
+      }
+      LOG.trace("rename(src: %s, dst: %s): false [failed]", src, dst, e);
+      return false;
+    }
+
+    return true;
   }
 
   @Override
@@ -468,8 +497,7 @@ public Path getWorkingDirectory() {
   public boolean mkdirs(final Path hadoopPath, final FsPermission permission) 
throws IOException {
     checkArgument(hadoopPath != null, "hadoopPath must not be null");
 
-    LOG.trace(
-        "mkdirs(hadoopPath: {}, permission: {}): true", hadoopPath, 
permission);
+    LOG.trace("mkdirs(hadoopPath: {}, permission: {}): true", hadoopPath, 
permission);
 
     checkOpen();
 
@@ -582,7 +610,6 @@ public void setWorkingDirectory(final Path hadoopPath) {
     LOG.trace("setWorkingDirectory(hadoopPath: {}): {}", hadoopPath, 
workingDirectory);
   }
 
-
   private static String getUgiUserName() throws IOException {
     UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
     return ugi.getShortUserName();
diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystemConfiguration.java
 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystemConfiguration.java
index a480a72e60b..20831885fe6 100644
--- 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystemConfiguration.java
+++ 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystemConfiguration.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.fs.gs;
 
+import java.util.regex.Pattern;
+
 import static java.lang.Math.toIntExact;
 
 import org.apache.hadoop.conf.Configuration;
@@ -26,6 +28,8 @@
  * This class provides a configuration for the {@link GoogleHadoopFileSystem} 
implementations.
  */
 class GoogleHadoopFileSystemConfiguration {
+  private static final Long GCS_INPUT_STREAM_INPLACE_SEEK_LIMIT_DEFAULT = 8 * 
1024 * 1024L;
+
   /**
    * Configuration key for default block size of a file.
    *
@@ -55,23 +59,79 @@ class GoogleHadoopFileSystemConfiguration {
   static final HadoopConfigurationProperty<Long> GCS_OUTPUT_STREAM_BUFFER_SIZE 
=
       new HadoopConfigurationProperty<>("fs.gs.outputstream.buffer.size", 8L * 
1024 * 1024);
 
+
+  /**
+   * If forward seeks are within this many bytes of the current position, 
seeks are performed by
+   * reading and discarding bytes in-place rather than opening a new 
underlying stream.
+   */
+  public static final HadoopConfigurationProperty<Long> 
GCS_INPUT_STREAM_INPLACE_SEEK_LIMIT =
+      new HadoopConfigurationProperty<>(
+          "fs.gs.inputstream.inplace.seek.limit",
+          GCS_INPUT_STREAM_INPLACE_SEEK_LIMIT_DEFAULT);
+
+  /** Tunes reading objects behavior to optimize HTTP GET requests for various 
use cases. */
+  public static final HadoopConfigurationProperty<Fadvise> 
GCS_INPUT_STREAM_FADVISE =
+      new HadoopConfigurationProperty<>("fs.gs.inputstream.fadvise", 
Fadvise.RANDOM);
+
+  /**
+   * If false, reading a file with GZIP content encoding (HTTP header 
"Content-Encoding: gzip") will
+   * result in failure (IOException is thrown).
+   */
+  public static final HadoopConfigurationProperty<Boolean>
+      GCS_INPUT_STREAM_SUPPORT_GZIP_ENCODING_ENABLE =
+      new HadoopConfigurationProperty<>(
+          "fs.gs.inputstream.support.gzip.encoding.enable",
+          false);
+
+  /**
+   * Minimum size in bytes of the HTTP Range header set in GCS request when 
opening new stream to
+   * read an object.
+   */
+  public static final HadoopConfigurationProperty<Long> 
GCS_INPUT_STREAM_MIN_RANGE_REQUEST_SIZE =
+      new HadoopConfigurationProperty<>(
+          "fs.gs.inputstream.min.range.request.size",
+          2 * 1024 * 1024L);
+
+  /**
+   * Configuration key for number of request to track for adapting the access 
pattern i.e. fadvise:
+   * AUTO & AUTO_RANDOM.
+   */
+  public static final HadoopConfigurationProperty<Integer> 
GCS_FADVISE_REQUEST_TRACK_COUNT =
+      new HadoopConfigurationProperty<>("fs.gs.fadvise.request.track.count", 
3);
+
+  /**
+   * Configuration key for specifying max number of bytes rewritten in a 
single rewrite request when
+   * fs.gs.copy.with.rewrite.enable is set to 'true'.
+   */
+  public static final HadoopConfigurationProperty<Long> 
GCS_REWRITE_MAX_CHUNK_SIZE =
+      new HadoopConfigurationProperty<>(
+          "fs.gs.rewrite.max.chunk.size",
+          512 * 1024 * 1024L);
+
+  /** Configuration key for marker file pattern. Default value: none */
+  public static final HadoopConfigurationProperty<String> 
GCS_MARKER_FILE_PATTERN =
+      new HadoopConfigurationProperty<>("fs.gs.marker.file.pattern");
+
   private final String workingDirectory;
   private final String projectId;
+  private final Configuration config;
+  private Pattern fileMarkerFilePattern;
 
-  public int getOutStreamBufferSize() {
+  int getOutStreamBufferSize() {
     return outStreamBufferSize;
   }
 
   private final int outStreamBufferSize;
 
-  GoogleHadoopFileSystemConfiguration(Configuration config) {
-    this.workingDirectory = GCS_WORKING_DIRECTORY.get(config, config::get);
+  GoogleHadoopFileSystemConfiguration(Configuration conf) {
+    this.workingDirectory = GCS_WORKING_DIRECTORY.get(conf, conf::get);
     this.outStreamBufferSize =
-        toIntExact(GCS_OUTPUT_STREAM_BUFFER_SIZE.get(config, 
config::getLongBytes));
-    this.projectId = GCS_PROJECT_ID.get(config, config::get);
+        toIntExact(GCS_OUTPUT_STREAM_BUFFER_SIZE.get(conf, 
conf::getLongBytes));
+    this.projectId = GCS_PROJECT_ID.get(conf, conf::get);
+    this.config = conf;
   }
 
-  public String getWorkingDirectory() {
+  String getWorkingDirectory() {
     return this.workingDirectory;
   }
 
@@ -79,7 +139,53 @@ String getProjectId() {
     return this.projectId;
   }
 
-  public long getMaxListItemsPerCall() {
+  long getMaxListItemsPerCall() {
     return 5000L; //TODO: Make this configurable
   }
+
+  Fadvise getFadvise() {
+    return GCS_INPUT_STREAM_FADVISE.get(config, config::getEnum);
+  }
+
+  long getInplaceSeekLimit() {
+    return GCS_INPUT_STREAM_INPLACE_SEEK_LIMIT.get(config, 
config::getLongBytes);
+  }
+
+  public int getFadviseRequestTrackCount() {
+    return GCS_FADVISE_REQUEST_TRACK_COUNT.get(config, config::getInt);
+  }
+
+  public boolean isGzipEncodingSupportEnabled() {
+    return GCS_INPUT_STREAM_SUPPORT_GZIP_ENCODING_ENABLE.get(config, 
config::getBoolean);
+  }
+
+  public long getMinRangeRequestSize() {
+    return GCS_INPUT_STREAM_MIN_RANGE_REQUEST_SIZE.get(config, 
config::getLongBytes);
+  }
+
+  public long getBlockSize() {
+    return BLOCK_SIZE.get(config, config::getLong);
+  }
+
+  public boolean isReadExactRequestedBytesEnabled() {
+    return false; //TODO: Remove this option?
+  }
+
+  public long getMaxRewriteChunkSize() {
+    return GCS_REWRITE_MAX_CHUNK_SIZE.get(config, config::getLong);
+  }
+
+  public Pattern getMarkerFilePattern() {
+    String pattern =  GCS_MARKER_FILE_PATTERN.get(config, config::get);
+    if (pattern == null) {
+      return null;
+    }
+
+    if (fileMarkerFilePattern == null) {
+      // Caching the pattern since compile step can be expensive
+      fileMarkerFilePattern =  Pattern.compile("^(.+/)?" + pattern + "$");
+    }
+
+    return fileMarkerFilePattern;
+  }
 }
diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/Gs.java 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/Gs.java
new file mode 100644
index 00000000000..e0b5ec4acaf
--- /dev/null
+++ b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/Gs.java
@@ -0,0 +1,64 @@
+package org.apache.hadoop.fs.gs;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.DelegateToFileSystem;
+
+/**
+ * GCS implementation of AbstractFileSystem.
+ * This impl delegates to the GoogleHadoopFileSystem
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class Gs extends DelegateToFileSystem {
+  public Gs(URI theUri, Configuration conf) throws IOException, 
URISyntaxException {
+    super(theUri, new GoogleHadoopFileSystem(), conf,
+        theUri.getScheme().isEmpty() ? Constants.SCHEME : theUri.getScheme(), 
false);
+  }
+
+  @Override
+  public int getUriDefaultPort() {
+    return super.getUriDefaultPort();
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder("gs{");
+    sb.append("URI =").append(fsImpl.getUri());
+    sb.append("; fsImpl=").append(fsImpl);
+    sb.append('}');
+    return sb.toString();
+  }
+
+  /**
+   * Close the file system; the FileContext API doesn't have an explicit close.
+   */
+  @Override
+  protected void finalize() throws Throwable {
+    fsImpl.close();
+    super.finalize();
+  }
+}
\ No newline at end of file
diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ListFileOptions.java
 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ListFileOptions.java
index 2bc74c6fc21..6ef7b7641f5 100644
--- 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ListFileOptions.java
+++ 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ListFileOptions.java
@@ -22,6 +22,10 @@
 
 final class ListFileOptions {
   static final ListFileOptions OBJECTFIELDS = new 
ListFileOptions("bucket,name,size,updated");
+
+  static final ListFileOptions DELETE_RENAME_LIST_OPTIONS =
+      new ListFileOptions("bucket,name,generation");
+
   private final String fields;
 
   private ListFileOptions(@Nonnull String fields) {
diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ListFileOptions.java
 
b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractOpen.java
similarity index 64%
copy from 
hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ListFileOptions.java
copy to 
hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractOpen.java
index 2bc74c6fc21..c3d7c6ddac7 100644
--- 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ListFileOptions.java
+++ 
b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractOpen.java
@@ -16,19 +16,16 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.fs.gs;
+package org.apache.hadoop.fs.gs.contract;
 
-import javax.annotation.Nonnull;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractOpenTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
 
-final class ListFileOptions {
-  static final ListFileOptions OBJECTFIELDS = new 
ListFileOptions("bucket,name,size,updated");
-  private final String fields;
-
-  private ListFileOptions(@Nonnull String fields) {
-    this.fields = fields;
-  }
-
-  String getFields() {
-    return fields;
+/** GCS contract tests covering file open. */
+public class ITestGoogleContractOpen extends AbstractContractOpenTest {
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new GoogleContract(conf);
   }
 }
diff --git 
a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractRename.java
 
b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractRename.java
new file mode 100644
index 00000000000..5d9459cac19
--- /dev/null
+++ 
b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractRename.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.gs.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractRenameTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+
+/** GCS contract tests covering file rename. */
+public class ITestGoogleContractRename extends AbstractContractRenameTest {
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new GoogleContract(conf);
+  }
+
+  @Override
+  public void testRenameWithNonEmptySubDir() {
+    // TODO: Enable this
+    ContractTestUtils.skip("Skipping the test. This will be enabled in a 
subsequent change");
+  }
+
+  @Override
+  public void testRenameNonexistentFile() {
+    // TODO: Enable this
+    ContractTestUtils.skip("Skipping the test. This will be enabled in a 
subsequent change");
+  }
+}
diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ListFileOptions.java
 
b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractSeek.java
similarity index 67%
copy from 
hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ListFileOptions.java
copy to 
hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractSeek.java
index 2bc74c6fc21..651487bd641 100644
--- 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ListFileOptions.java
+++ 
b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractSeek.java
@@ -16,19 +16,15 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.fs.gs;
+package org.apache.hadoop.fs.gs.contract;
 
-import javax.annotation.Nonnull;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractSeekTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
 
-final class ListFileOptions {
-  static final ListFileOptions OBJECTFIELDS = new 
ListFileOptions("bucket,name,size,updated");
-  private final String fields;
-
-  private ListFileOptions(@Nonnull String fields) {
-    this.fields = fields;
-  }
-
-  String getFields() {
-    return fields;
+public class ITestGoogleContractSeek extends AbstractContractSeekTest {
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new GoogleContract(conf);
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org


Reply via email to