This is an automated email from the ASF dual-hosted git repository.

cnauroth pushed a commit to branch HADOOP-19343
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 13614c3bd3ee56f2979ce8a286f8cfe51c156e14
Author: Arunkumar Chacko <aruncha...@google.com>
AuthorDate: Tue Jun 10 04:53:19 2025 +0000

    HADOOP-19343: Add support for mkdir() and getFileStatus()
    
    Closes #7721
    
    Signed-off-by: Chris Nauroth <cnaur...@apache.org>
---
 hadoop-project/pom.xml                             |   2 +-
 hadoop-tools/hadoop-gcp/pom.xml                    |  11 +
 .../org/apache/hadoop/fs/gs/ApiErrorExtractor.java | 327 +++++++++++++++++
 .../apache/hadoop/fs/gs/CreateBucketOptions.java   |  81 +++++
 .../apache/hadoop/fs/gs/CreateObjectOptions.java   | 127 +++++++
 .../apache/hadoop/fs/gs/ErrorTypeExtractor.java    |  47 ++-
 .../java/org/apache/hadoop/fs/gs/FileInfo.java     |  14 +-
 .../apache/hadoop/fs/gs/GoogleCloudStorage.java    | 385 ++++++++++++++++++++-
 .../hadoop/fs/gs/GoogleCloudStorageFileSystem.java | 285 +++++++++++++++
 .../hadoop/fs/gs/GoogleCloudStorageItemInfo.java   |  14 +-
 .../hadoop/fs/gs/GoogleHadoopFileSystem.java       | 156 +++++++--
 .../fs/gs/GoogleHadoopFileSystemConfiguration.java |  18 +-
 .../org/apache/hadoop/fs/gs/IoExceptionHelper.java |  83 +++++
 .../org/apache/hadoop/fs/gs/ListFileOptions.java   |  34 ++
 .../org/apache/hadoop/fs/gs/ListObjectOptions.java | 141 ++++++++
 .../hadoop/fs/gs/VerificationAttributes.java       |  14 +-
 .../org/apache/hadoop/fs/gs/TestConfiguration.java |  64 ++++
 .../apache/hadoop/fs/gs/TestStorageResourceId.java |  14 +-
 .../org/apache/hadoop/fs/gs/TestStringPaths.java   |  14 +-
 .../java/org/apache/hadoop/fs/gs/TestUriPaths.java |  14 +-
 .../hadoop/fs/gs/contract/GoogleContract.java      |  44 +++
 .../fs/gs/contract/ITestGoogleContractDelete.java  |  37 ++
 .../contract/ITestGoogleContractGetFileStatus.java |  30 ++
 .../fs/gs/contract/ITestGoogleContractMkdir.java   |  61 ++++
 .../apache/hadoop/fs/gs/contract/package-info.java |  22 ++
 25 files changed, 1959 insertions(+), 80 deletions(-)

diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index 3e0cb4dfbf9..1cb6b157ccd 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -2156,7 +2156,7 @@
       <dependency>
       <groupId>com.google.cloud</groupId>
       <artifactId>google-cloud-storage</artifactId>
-      <version>2.44.1</version>
+      <version>2.52.0</version>
     </dependency>
     </dependencies>
   </dependencyManagement>
diff --git a/hadoop-tools/hadoop-gcp/pom.xml b/hadoop-tools/hadoop-gcp/pom.xml
index d5744f1f97c..2da2881ab79 100644
--- a/hadoop-tools/hadoop-gcp/pom.xml
+++ b/hadoop-tools/hadoop-gcp/pom.xml
@@ -261,6 +261,10 @@
                   <include>com.lmax</include>
                   <include>io.grpc</include>
                   <include>io.opencensus</include>
+                  <include>io.opentelemetry</include>
+                  <include>io.opentelemetry.api</include>
+                  <include>io.opentelemetry.contrib</include>
+                  <include>io.opentelemetry.semconv</include>
                   <include>io.perfmark</include>
                   <include>org.apache.httpcomponents</include>
                   <include>org.threeten:threetenbp</include>
@@ -282,6 +286,7 @@
                     <include>com.google.cloud.hadoop.util.**</include>
                     <include>com.google.cloud.http.**</include>
                     <include>com.google.cloud.monitoring.**</include>
+                    <include>com.google.cloud.opentelemetry.**</include>
                     <include>com.google.cloud.spi.**</include>
                     <include>com.google.cloud.storage.**</include>
                     <include>com.google.common.**</include>
@@ -459,6 +464,12 @@
         </exclusion>
       </exclusions>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
     <dependency>
       <groupId>org.assertj</groupId>
       <artifactId>assertj-core</artifactId>
diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ApiErrorExtractor.java
 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ApiErrorExtractor.java
new file mode 100644
index 00000000000..4fef41b1971
--- /dev/null
+++ 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ApiErrorExtractor.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.gs;
+
+import com.google.api.client.googleapis.json.GoogleJsonError;
+import com.google.api.client.googleapis.json.GoogleJsonError.ErrorInfo;
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.client.http.HttpResponseException;
+import com.google.api.client.http.HttpStatusCodes;
+import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
+import org.apache.hadoop.thirdparty.com.google.common.collect.Iterables;
+import java.io.IOException;
+import java.util.List;
+import javax.annotation.Nullable;
+
+/**
+ * Translates exceptions from API calls into higher-level meaning, while 
allowing injectability for
+ * testing how API errors are handled.
+ */
+class ApiErrorExtractor {
+
+  /** Singleton instance of the ApiErrorExtractor. */
+  public static final ApiErrorExtractor INSTANCE = new ApiErrorExtractor();
+
+  public static final int STATUS_CODE_RANGE_NOT_SATISFIABLE = 416;
+
+  public static final String GLOBAL_DOMAIN = "global";
+  public static final String USAGE_LIMITS_DOMAIN = "usageLimits";
+
+  public static final String RATE_LIMITED_REASON = "rateLimitExceeded";
+  public static final String USER_RATE_LIMITED_REASON = 
"userRateLimitExceeded";
+
+  public static final String QUOTA_EXCEEDED_REASON = "quotaExceeded";
+
+  // These come with "The account for ... has been disabled" message.
+  public static final String ACCOUNT_DISABLED_REASON = "accountDisabled";
+
+  // These come with "Project marked for deletion" message.
+  public static final String ACCESS_NOT_CONFIGURED_REASON = 
"accessNotConfigured";
+
+  // These are 400 error codes with "resource 'xyz' is not ready" message.
+  // These sometimes happens when create operation is still in-flight but 
resource
+  // representation is already available via get call.
+  public static final String RESOURCE_NOT_READY_REASON = "resourceNotReady";
+
+  // HTTP 413 with message "Value for field 'foo' is too large".
+  public static final String FIELD_SIZE_TOO_LARGE_REASON = "fieldSizeTooLarge";
+
+  // HTTP 400 message for 'USER_PROJECT_MISSING' error.
+  public static final String USER_PROJECT_MISSING_MESSAGE =
+      "Bucket is a requester pays bucket but no user project provided.";
+
+  // The debugInfo field present on Errors collection in GoogleJsonException
+  // as an unknown key.
+  private static final String DEBUG_INFO_FIELD = "debugInfo";
+
+  /**
+   * Determines if the given exception indicates intermittent request failure 
or failure caused by
+   * user error.
+   */
+  public boolean requestFailure(IOException e) {
+    HttpResponseException httpException = getHttpResponseException(e);
+    return httpException != null
+        && (accessDenied(httpException)
+            || badRequest(httpException)
+            || internalServerError(httpException)
+            || rateLimited(httpException)
+            || IoExceptionHelper.isSocketError(httpException)
+            || unauthorized(httpException));
+  }
+
+  /**
+   * Determines if the given exception indicates 'access denied'. Recursively 
checks getCause() if
+   * outer exception isn't an instance of the correct class.
+   *
+   * <p>Warning: this method only checks for access denied status code, 
however this may include
+   * potentially recoverable reason codes such as rate limiting. For 
alternative, see {@link
+   * #accessDeniedNonRecoverable(IOException)}.
+   */
+  public boolean accessDenied(IOException e) {
+    return recursiveCheckForCode(e, HttpStatusCodes.STATUS_CODE_FORBIDDEN);
+  }
+
+  /** Determines if the given exception indicates bad request. */
+  public boolean badRequest(IOException e) {
+    return recursiveCheckForCode(e, HttpStatusCodes.STATUS_CODE_BAD_REQUEST);
+  }
+
+  /**
+   * Determines if the given exception indicates the request was 
unauthenticated. This can be caused
+   * by attaching invalid credentials to a request.
+   */
+  public boolean unauthorized(IOException e) {
+    return recursiveCheckForCode(e, HttpStatusCodes.STATUS_CODE_UNAUTHORIZED);
+  }
+
+  /**
+   * Determines if the exception is a non-recoverable access denied code (such 
as account closed or
+   * marked for deletion).
+   */
+  public boolean accessDeniedNonRecoverable(IOException e) {
+    ErrorInfo errorInfo = getErrorInfo(e);
+    String reason = errorInfo != null ? errorInfo.getReason() : null;
+    return ACCOUNT_DISABLED_REASON.equals(reason) || 
ACCESS_NOT_CONFIGURED_REASON.equals(reason);
+  }
+
+  /** Determines if the exception is a client error. */
+  public boolean clientError(IOException e) {
+    HttpResponseException httpException = getHttpResponseException(e);
+    return httpException != null && getHttpStatusCode(httpException) / 100 == 
4;
+  }
+
+  /** Determines if the exception is an internal server error. */
+  public boolean internalServerError(IOException e) {
+    HttpResponseException httpException = getHttpResponseException(e);
+    return httpException != null && getHttpStatusCode(httpException) / 100 == 
5;
+  }
+
+  /**
+   * Determines if the given exception indicates 'item already exists'. 
Recursively checks
+   * getCause() if outer exception isn't an instance of the correct class.
+   */
+  public boolean itemAlreadyExists(IOException e) {
+    return recursiveCheckForCode(e, HttpStatusCodes.STATUS_CODE_CONFLICT);
+  }
+
+  /**
+   * Determines if the given exception indicates 'item not found'. Recursively 
checks getCause() if
+   * outer exception isn't an instance of the correct class.
+   */
+  public boolean itemNotFound(IOException e) {
+    return recursiveCheckForCode(e, HttpStatusCodes.STATUS_CODE_NOT_FOUND);
+  }
+
+  /**
+   * Determines if the given exception indicates 'field size too large'. 
Recursively checks
+   * getCause() if outer exception isn't an instance of the correct class.
+   */
+  public boolean fieldSizeTooLarge(IOException e) {
+    ErrorInfo errorInfo = getErrorInfo(e);
+    return errorInfo != null && 
FIELD_SIZE_TOO_LARGE_REASON.equals(errorInfo.getReason());
+  }
+
+  /**
+   * Determines if the given exception indicates 'resource not ready'. 
Recursively checks getCause()
+   * if outer exception isn't an instance of the correct class.
+   */
+  public boolean resourceNotReady(IOException e) {
+    ErrorInfo errorInfo = getErrorInfo(e);
+    return errorInfo != null && 
RESOURCE_NOT_READY_REASON.equals(errorInfo.getReason());
+  }
+
+  /**
+   * Determines if the given IOException indicates 'precondition not met' 
Recursively checks
+   * getCause() if outer exception isn't an instance of the correct class.
+   */
+  public boolean preconditionNotMet(IOException e) {
+    return recursiveCheckForCode(e, 
HttpStatusCodes.STATUS_CODE_PRECONDITION_FAILED);
+  }
+
+  /**
+   * Determines if the given exception indicates 'range not satisfiable'. 
Recursively checks
+   * getCause() if outer exception isn't an instance of the correct class.
+   */
+  public boolean rangeNotSatisfiable(IOException e) {
+    return recursiveCheckForCode(e, STATUS_CODE_RANGE_NOT_SATISFIABLE);
+  }
+
+  /**
+   * Determines if a given Throwable is caused by a rate limit being applied. 
Recursively checks
+   * getCause() if outer exception isn't an instance of the correct class.
+   *
+   * @param e The Throwable to check.
+   * @return True if the Throwable is a result of rate limiting being applied.
+   */
+  public boolean rateLimited(IOException e) {
+    ErrorInfo errorInfo = getErrorInfo(e);
+    if (errorInfo != null) {
+      String domain = errorInfo.getDomain();
+      boolean isRateLimitedOrGlobalDomain =
+          USAGE_LIMITS_DOMAIN.equals(domain) || GLOBAL_DOMAIN.equals(domain);
+      String reason = errorInfo.getReason();
+      boolean isRateLimitedReason =
+          RATE_LIMITED_REASON.equals(reason) || 
USER_RATE_LIMITED_REASON.equals(reason);
+      return isRateLimitedOrGlobalDomain && isRateLimitedReason;
+    }
+    return false;
+  }
+
+  /**
+   * Determines if a given Throwable is caused by Quota Exceeded. Recursively 
checks getCause() if
+   * outer exception isn't an instance of the correct class.
+   */
+  public boolean quotaExceeded(IOException e) {
+    ErrorInfo errorInfo = getErrorInfo(e);
+    return errorInfo != null && 
QUOTA_EXCEEDED_REASON.equals(errorInfo.getReason());
+  }
+
+  /**
+   * Determines if the given exception indicates that 'userProject' is missing 
in request.
+   * Recursively checks getCause() if outer exception isn't an instance of the 
correct class.
+   */
+  public boolean userProjectMissing(IOException e) {
+    GoogleJsonError jsonError = getJsonError(e);
+    return jsonError != null
+        && jsonError.getCode() == HttpStatusCodes.STATUS_CODE_BAD_REQUEST
+        && USER_PROJECT_MISSING_MESSAGE.equals(jsonError.getMessage());
+  }
+
+  /** Extracts the error message. */
+  public String getErrorMessage(IOException e) {
+    // Prefer to use message from GJRE.
+    GoogleJsonError jsonError = getJsonError(e);
+    return jsonError == null ? e.getMessage() : jsonError.getMessage();
+  }
+
+  /**
+   * Converts the exception to a user-presentable error message. Specifically, 
extracts message
+   * field for HTTP 4xx codes, and creates a generic "Internal Server Error" 
for HTTP 5xx codes.
+   *
+   * @param e the exception
+   * @param action the description of the action being performed at the time 
of error.
+   * @see #toUserPresentableMessage(IOException, String)
+   */
+  public IOException toUserPresentableException(IOException e, String action) 
throws IOException {
+    throw new IOException(toUserPresentableMessage(e, action), e);
+  }
+
+  /**
+   * Converts the exception to a user-presentable error message. Specifically, 
extracts message
+   * field for HTTP 4xx codes, and creates a generic "Internal Server Error" 
for HTTP 5xx codes.
+   */
+  public String toUserPresentableMessage(IOException e, @Nullable String 
action) {
+    String message = "Internal server error";
+    if (clientError(e)) {
+      message = getErrorMessage(e);
+    }
+    return action == null
+        ? message
+        : String.format("Encountered an error while %s: %s", action, message);
+  }
+
+  /** See {@link #toUserPresentableMessage(IOException, String)}. */
+  public String toUserPresentableMessage(IOException e) {
+    return toUserPresentableMessage(e, null);
+  }
+
+  @Nullable
+  public String getDebugInfo(IOException e) {
+    ErrorInfo errorInfo = getErrorInfo(e);
+    return errorInfo != null ? (String) 
errorInfo.getUnknownKeys().get(DEBUG_INFO_FIELD) : null;
+  }
+
+  /**
+   * Returns HTTP status code from the given exception.
+   *
+   * <p>Note: GoogleJsonResponseException.getStatusCode() method is marked 
final therefore it cannot
+   * be mocked using Mockito. We use this helper so that we can override it in 
tests.
+   */
+  protected int getHttpStatusCode(HttpResponseException e) {
+    return e.getStatusCode();
+  }
+
+  /**
+   * Get the first ErrorInfo from an IOException if it is an instance of
+   * GoogleJsonResponseException, otherwise return null.
+   */
+  @Nullable
+  protected ErrorInfo getErrorInfo(IOException e) {
+    GoogleJsonError jsonError = getJsonError(e);
+    List<ErrorInfo> errors = jsonError != null ? jsonError.getErrors() : 
ImmutableList.of();
+    return errors != null ? Iterables.getFirst(errors, null) : null;
+  }
+
+  /** If the exception is a GoogleJsonResponseException, get the error 
details, else return null. */
+  @Nullable
+  protected GoogleJsonError getJsonError(IOException e) {
+    GoogleJsonResponseException jsonException = getJsonResponseException(e);
+    return jsonException == null ? null : jsonException.getDetails();
+  }
+
+  /** Recursively checks getCause() if outer exception isn't an instance of 
the correct class. */
+  protected boolean recursiveCheckForCode(IOException e, int code) {
+    HttpResponseException httpException = getHttpResponseException(e);
+    return httpException != null && getHttpStatusCode(httpException) == code;
+  }
+
+  @Nullable
+  public static GoogleJsonResponseException getJsonResponseException(Throwable 
throwable) {
+    Throwable cause = throwable;
+    while (cause != null) {
+      if (cause instanceof GoogleJsonResponseException) {
+        return (GoogleJsonResponseException) cause;
+      }
+      cause = cause.getCause();
+    }
+    return null;
+  }
+
+  @Nullable
+  public static HttpResponseException getHttpResponseException(Throwable 
throwable) {
+    Throwable cause = throwable;
+    while (cause != null) {
+      if (cause instanceof HttpResponseException) {
+        return (HttpResponseException) cause;
+      }
+      cause = cause.getCause();
+    }
+    return null;
+  }
+}
diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/CreateBucketOptions.java
 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/CreateBucketOptions.java
new file mode 100644
index 00000000000..46cd2a7efbd
--- /dev/null
+++ 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/CreateBucketOptions.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.gs;
+
+import java.time.Duration;
+
+final class CreateBucketOptions {
+  // TODO: Make sure the defaults have the setting matching the existing 
connector.
+  static final CreateBucketOptions DEFAULT = new Builder().build();
+  private final String location;
+  private final String storageClass;
+  private final Duration ttl;
+  private final String projectId;
+
+  private CreateBucketOptions(Builder builder) {
+    this.location = builder.location;
+    this.storageClass = builder.storageClass;
+    this.ttl = builder.ttl;
+    this.projectId = builder.projectId;
+  }
+
+  public String getLocation() {
+    return location;
+  }
+
+  public String getStorageClass() {
+    return storageClass;
+  }
+
+  public Duration getTtl() { // Changed return type to Duration
+    return ttl;
+  }
+
+  static class Builder {
+    private String location;
+    private String storageClass;
+    private Duration ttl;
+    private String projectId;
+
+    public Builder withLocation(String loc) {
+      this.location = loc;
+      return this;
+    }
+
+    public Builder withStorageClass(String sc) {
+      this.storageClass = sc;
+      return this;
+    }
+
+    public Builder withTtl(Duration ttlDuration) {
+      this.ttl = ttlDuration;
+      return this;
+    }
+
+    public Builder withProjectId(String pid) {
+      this.projectId = pid;
+      return this;
+    }
+
+    public CreateBucketOptions build() {
+      return new CreateBucketOptions(this);
+    }
+  }
+}
+
diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/CreateObjectOptions.java
 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/CreateObjectOptions.java
new file mode 100644
index 00000000000..26c91fcae7b
--- /dev/null
+++ 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/CreateObjectOptions.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.gs;
+
+import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Options that can be specified when creating a file in the {@link 
GoogleCloudStorage}. */
+
+final class CreateObjectOptions {
+  static final CreateObjectOptions DEFAULT_OVERWRITE = 
builder().setOverwriteExisting(true).build();
+
+  private final String contentEncoding;
+  private final String contentType;
+  private final boolean ensureEmptyObjectsMetadataMatch;
+  private final String kmsKeyName;
+  private final ImmutableMap<String, byte[]> metadata;
+  private final boolean overwriteExisting;
+
+  private CreateObjectOptions(Builder builder) {
+    this.contentEncoding = builder.contentEncoding;
+    this.contentType = builder.contentType;
+    this.ensureEmptyObjectsMetadataMatch = 
builder.ensureEmptyObjectsMetadataMatch;
+    this.kmsKeyName = builder.kmsKeyName;
+    this.metadata = ImmutableMap.copyOf(builder.metadata);
+    this.overwriteExisting = builder.overwriteExisting;
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public String getContentEncoding() {
+    return contentEncoding;
+  }
+
+  public String getContentType() {
+    return contentType;
+  }
+
+  public boolean isEnsureEmptyObjectsMetadataMatch() {
+    return ensureEmptyObjectsMetadataMatch;
+  }
+
+  public String getKmsKeyName() {
+    return kmsKeyName;
+  }
+
+  public Map<String, byte[]> getMetadata() {
+    return metadata;
+  }
+
+  public boolean isOverwriteExisting() {
+    return overwriteExisting;
+  }
+
+  public Builder toBuilder() {
+    return 
builder().setContentEncoding(this.contentEncoding).setContentType(this.contentType)
+        
.setEnsureEmptyObjectsMetadataMatch(this.ensureEmptyObjectsMetadataMatch)
+        .setKmsKeyName(this.kmsKeyName).setMetadata(this.metadata)
+        .setOverwriteExisting(this.overwriteExisting);
+  }
+
+  static final class Builder {
+    private String contentEncoding;
+    private String contentType;
+    private boolean ensureEmptyObjectsMetadataMatch = false;
+    private String kmsKeyName;
+    private Map<String, byte[]> metadata = new HashMap<>();
+    private boolean overwriteExisting = false;
+
+    private Builder() {
+    }
+
+    public Builder setContentEncoding(String ce) {
+      this.contentEncoding = ce;
+      return this;
+    }
+
+    public Builder setContentType(String ct) {
+      this.contentType = ct;
+      return this;
+    }
+
+    public Builder setEnsureEmptyObjectsMetadataMatch(boolean val) {
+      this.ensureEmptyObjectsMetadataMatch = val;
+      return this;
+    }
+
+    public Builder setKmsKeyName(String key) {
+      this.kmsKeyName = key;
+      return this;
+    }
+
+    public Builder setMetadata(Map<String, byte[]> m) {
+      this.metadata = m;
+      return this;
+    }
+
+    public Builder setOverwriteExisting(boolean overwrite) {
+      this.overwriteExisting = overwrite;
+      return this;
+    }
+
+    public CreateObjectOptions build() {
+      return new CreateObjectOptions(this);
+    }
+  }
+}
diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ErrorTypeExtractor.java
 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ErrorTypeExtractor.java
index a4497734524..547d855d1d6 100644
--- 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ErrorTypeExtractor.java
+++ 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ErrorTypeExtractor.java
@@ -1,11 +1,13 @@
 /*
- * Copyright 2023 Google LLC
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,13 +18,46 @@
 
 package org.apache.hadoop.fs.gs;
 
+import javax.annotation.Nullable;
+
 import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
 
 /**
  * Implementation for {@link ErrorTypeExtractor} for exception specifically 
thrown from gRPC path.
  */
 final class ErrorTypeExtractor {
 
+  static boolean bucketAlreadyExists(Exception e) {
+    ErrorType errorType = getErrorType(e);
+    if (errorType == ErrorType.ALREADY_EXISTS) {
+      return true;
+    } else if (errorType == ErrorType.FAILED_PRECONDITION) {
+      // The gRPC API currently throws a FAILED_PRECONDITION status code 
instead of ALREADY_EXISTS,
+      // so we handle both these conditions in the interim.
+      StatusRuntimeException statusRuntimeException = 
getStatusRuntimeException(e);
+      return statusRuntimeException != null
+          && 
BUCKET_ALREADY_EXISTS_MESSAGE.equals(statusRuntimeException.getMessage());
+    }
+    return false;
+  }
+
+  @Nullable
+  static private StatusRuntimeException getStatusRuntimeException(Exception e) 
{
+    Throwable cause = e;
+    // Keeping a counter to break early from the loop to avoid infinite loop 
condition due to
+    // cyclic exception chains.
+    int currentExceptionDepth = 0, maxChainDepth = 1000;
+    while (cause != null && currentExceptionDepth < maxChainDepth) {
+      if (cause instanceof StatusRuntimeException) {
+        return (StatusRuntimeException) cause;
+      }
+      cause = cause.getCause();
+      currentExceptionDepth++;
+    }
+    return null;
+  }
+
   enum ErrorType {
     NOT_FOUND, OUT_OF_RANGE, ALREADY_EXISTS, FAILED_PRECONDITION, INTERNAL, 
RESOURCE_EXHAUSTED,
     UNAVAILABLE, UNKNOWN
diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/FileInfo.java 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/FileInfo.java
index df8d63f5eec..3b9d9f475ae 100644
--- 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/FileInfo.java
+++ 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/FileInfo.java
@@ -1,11 +1,13 @@
 /*
- * Copyright 2013 Google Inc.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorage.java
 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorage.java
index 9c15962b7ef..d68eca6a8a5 100644
--- 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorage.java
+++ 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorage.java
@@ -20,7 +20,12 @@
 
 import static 
org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.*;
 import static 
org.apache.hadoop.thirdparty.com.google.common.base.Strings.isNullOrEmpty;
+import static java.lang.Math.toIntExact;
 
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.ExponentialBackOff;
+import com.google.api.client.util.Sleeper;
+import com.google.api.gax.paging.Page;
 import com.google.cloud.storage.*;
 import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
 import org.apache.hadoop.thirdparty.com.google.common.collect.Maps;
@@ -32,21 +37,34 @@
 import java.io.IOException;
 import java.nio.channels.WritableByteChannel;
 import java.nio.file.FileAlreadyExistsException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * A wrapper around <a 
href="https://github.com/googleapis/java-storage";>Google cloud storage
  * client</a>.
  */
 class GoogleCloudStorage {
-  public static final Logger LOG = 
LoggerFactory.getLogger(GoogleHadoopFileSystem.class);
+  static final Logger LOG = 
LoggerFactory.getLogger(GoogleHadoopFileSystem.class);
   static final List<Storage.BlobField> BLOB_FIELDS =
-      ImmutableList.of(Storage.BlobField.BUCKET, 
Storage.BlobField.CONTENT_ENCODING,
+      ImmutableList.of(
+          Storage.BlobField.BUCKET, Storage.BlobField.CONTENT_ENCODING,
           Storage.BlobField.CONTENT_TYPE, Storage.BlobField.CRC32C, 
Storage.BlobField.GENERATION,
           Storage.BlobField.METADATA, Storage.BlobField.MD5HASH, 
Storage.BlobField.METAGENERATION,
           Storage.BlobField.NAME, Storage.BlobField.SIZE, 
Storage.BlobField.TIME_CREATED,
           Storage.BlobField.UPDATED);
+
+  static final CreateObjectOptions EMPTY_OBJECT_CREATE_OPTIONS =
+      CreateObjectOptions.DEFAULT_OVERWRITE.toBuilder()
+          .setEnsureEmptyObjectsMetadataMatch(false)
+          .build();
+
   private final Storage storage;
   private final GoogleHadoopFileSystemConfiguration configuration;
 
@@ -55,13 +73,20 @@ class GoogleCloudStorage {
    * is in WIP.
    */
   GoogleCloudStorage(GoogleHadoopFileSystemConfiguration configuration) throws 
IOException {
-    // TODO: Set projectId
     // TODO: Set credentials
-    this.storage = StorageOptions.newBuilder().build().getService();
+    this.storage = createStorage(configuration.getProjectId());
     this.configuration = configuration;
   }
 
-  public WritableByteChannel create(final StorageResourceId resourceId, final 
CreateOptions options)
+  private static Storage createStorage(String projectId) {
+    if (projectId != null) {
+      return 
StorageOptions.newBuilder().setProjectId(projectId).build().getService();
+    }
+
+    return StorageOptions.newBuilder().build().getService();
+  }
+
+  WritableByteChannel create(final StorageResourceId resourceId, final 
CreateOptions options)
       throws IOException {
     LOG.trace("create({})", resourceId);
 
@@ -104,7 +129,7 @@ private long getWriteGeneration(StorageResourceId 
resourceId, boolean overwrite)
     throw new FileAlreadyExistsException(String.format("Object %s already 
exists.", resourceId));
   }
 
-  public void close() {
+  void close() {
     try {
       storage.close();
     } catch (Exception e) {
@@ -112,7 +137,7 @@ public void close() {
     }
   }
 
-  public GoogleCloudStorageItemInfo getItemInfo(StorageResourceId resourceId) 
throws IOException {
+  GoogleCloudStorageItemInfo getItemInfo(StorageResourceId resourceId) throws 
IOException {
     LOG.trace("getItemInfo({})", resourceId);
 
     // Handle ROOT case first.
@@ -258,4 +283,350 @@ private static GoogleCloudStorageItemInfo 
createItemInfoForBucket(StorageResourc
         bucket.getLocation(),
         bucket.getStorageClass() == null ? null : 
bucket.getStorageClass().name());
   }
+
+  List<GoogleCloudStorageItemInfo> listObjectInfo(
+      String bucketName,
+      String objectNamePrefix,
+      ListObjectOptions listOptions) throws IOException {
+    try {
+      long maxResults = listOptions.getMaxResults() > 0 ?
+          listOptions.getMaxResults() + (listOptions.isIncludePrefix() ? 0 : 
1) :
+          listOptions.getMaxResults();
+
+      Storage.BlobListOption[] blobListOptions =
+          getBlobListOptions(objectNamePrefix, listOptions, maxResults);
+      Page<Blob> blobs = storage.list(bucketName, blobListOptions);
+      ListOperationResult result = new ListOperationResult(maxResults);
+      for (Blob blob : blobs.iterateAll()) {
+        result.add(blob);
+      }
+
+      return result.getItems();
+    } catch (StorageException e) {
+      throw new IOException(
+          String.format("listing object '%s' failed.", BlobId.of(bucketName, 
objectNamePrefix)),
+          e);
+    }
+  }
+
+  private Storage.BlobListOption[] getBlobListOptions(
+      String objectNamePrefix, ListObjectOptions listOptions, long maxResults) 
{
+    List<Storage.BlobListOption> options = new ArrayList<>();
+
+    options.add(Storage.BlobListOption.fields(BLOB_FIELDS.toArray(new 
Storage.BlobField[0])));
+    options.add(Storage.BlobListOption.prefix(objectNamePrefix));
+    // TODO: set max results as a BlobListOption
+    if ("/".equals(listOptions.getDelimiter())) {
+      options.add(Storage.BlobListOption.currentDirectory());
+    }
+
+    if (listOptions.getDelimiter() != null) {
+      options.add(Storage.BlobListOption.includeTrailingDelimiter());
+    }
+
+    return options.toArray(new Storage.BlobListOption[0]);
+  }
+
+  private GoogleCloudStorageItemInfo createItemInfoForBlob(Blob blob) {
+    long generationId = blob.getGeneration() == null ? 0L : 
blob.getGeneration();
+    StorageResourceId resourceId =
+        new StorageResourceId(blob.getBucket(), blob.getName(), generationId);
+    return createItemInfoForBlob(resourceId, blob);
+  }
+
+  void createBucket(String bucketName, CreateBucketOptions options) throws 
IOException {
+    LOG.trace("createBucket({})", bucketName);
+    checkArgument(!isNullOrEmpty(bucketName), "bucketName must not be null or 
empty");
+    checkNotNull(options, "options must not be null");
+
+    BucketInfo.Builder bucketInfoBuilder =
+        BucketInfo.newBuilder(bucketName).setLocation(options.getLocation());
+
+    if (options.getStorageClass() != null) {
+      bucketInfoBuilder.setStorageClass(
+          StorageClass.valueOfStrict(options.getStorageClass().toUpperCase()));
+    }
+    if (options.getTtl() != null) {
+      bucketInfoBuilder.setLifecycleRules(
+          Collections.singletonList(
+              new BucketInfo.LifecycleRule(
+                  BucketInfo.LifecycleRule.LifecycleAction.newDeleteAction(),
+                  BucketInfo.LifecycleRule.LifecycleCondition.newBuilder()
+                      .setAge(toIntExact(options.getTtl().toDays()))
+                      .build())));
+    }
+    try {
+      storage.create(bucketInfoBuilder.build());
+    } catch (StorageException e) {
+      if (ErrorTypeExtractor.bucketAlreadyExists(e)) {
+        throw (FileAlreadyExistsException)
+            new FileAlreadyExistsException(String.format("Bucket '%s' already 
exists.", bucketName))
+                .initCause(e);
+      }
+      throw new IOException(e);
+    }
+  }
+
+  void createEmptyObject(StorageResourceId resourceId) throws IOException {
+    LOG.trace("createEmptyObject({})", resourceId);
+    checkArgument(
+        resourceId.isStorageObject(), "Expected full StorageObject id, got 
%s", resourceId);
+    createEmptyObject(resourceId, EMPTY_OBJECT_CREATE_OPTIONS);
+  }
+
+  void createEmptyObject(StorageResourceId resourceId, CreateObjectOptions 
options)
+      throws IOException {
+    checkArgument(
+        resourceId.isStorageObject(), "Expected full StorageObject id, got 
%s", resourceId);
+
+    try {
+      createEmptyObjectInternal(resourceId, options);
+    } catch (StorageException e) {
+      if (canIgnoreExceptionForEmptyObject(e, resourceId, options)) {
+        LOG.info(
+            "Ignoring exception of type {}; verified object already exists 
with desired state.",
+            e.getClass().getSimpleName());
+        LOG.trace("Ignored exception while creating empty object: {}", 
resourceId, e);
+      } else {
+        if (ErrorTypeExtractor.getErrorType(e) == 
ErrorTypeExtractor.ErrorType.ALREADY_EXISTS) {
+          throw (FileAlreadyExistsException)
+              new FileAlreadyExistsException(
+                  String.format("Object '%s' already exists.", resourceId)
+              ).initCause(e);
+        }
+        throw new IOException(e);
+      }
+    }
+  }
+
+  /**
+   * Helper to check whether an empty object already exists with the expected 
metadata specified in
+   * {@code options}, to be used to determine whether it's safe to ignore an 
exception that was
+   * thrown when trying to create the object, {@code exceptionOnCreate}.
+   */
+  private boolean canIgnoreExceptionForEmptyObject(
+      StorageException exceptionOnCreate, StorageResourceId resourceId, 
CreateObjectOptions options)
+      throws IOException {
+    ErrorTypeExtractor.ErrorType errorType = 
ErrorTypeExtractor.getErrorType(exceptionOnCreate);
+    if (shouldBackoff(resourceId, errorType)) {
+      GoogleCloudStorageItemInfo existingInfo;
+      Duration maxWaitTime = Duration.ofSeconds(3); // TODO: make this 
configurable
+
+      BackOff backOff =
+          !maxWaitTime.isZero() && !maxWaitTime.isNegative()
+              ? new ExponentialBackOff.Builder()
+              .setMaxElapsedTimeMillis(toIntExact(maxWaitTime.toMillis()))
+              .setMaxIntervalMillis(500)
+              .setInitialIntervalMillis(100)
+              .setMultiplier(1.5)
+              .setRandomizationFactor(0.15)
+              .build()
+              : BackOff.STOP_BACKOFF;
+      long nextSleep = 0L;
+      do {
+        if (nextSleep > 0) {
+          try {
+            Sleeper.DEFAULT.sleep(nextSleep);
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            nextSleep = BackOff.STOP;
+          }
+        }
+        existingInfo = getItemInfo(resourceId);
+        nextSleep = nextSleep == BackOff.STOP ? BackOff.STOP : 
backOff.nextBackOffMillis();
+      } while (!existingInfo.exists() && nextSleep != BackOff.STOP);
+
+      // Compare existence, size, and metadata; for 429 errors creating an 
empty object,
+      // we don't care about metaGeneration/contentGeneration as long as the 
metadata
+      // matches, since we don't know for sure whether our low-level request 
succeeded
+      // first or some other client succeeded first.
+      if (existingInfo.exists() && existingInfo.getSize() == 0) {
+        if (options.isEnsureEmptyObjectsMetadataMatch()) {
+          return existingInfo.metadataEquals(options.getMetadata());
+        }
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private static boolean shouldBackoff(StorageResourceId resourceId,
+      ErrorTypeExtractor.ErrorType errorType) {
+    return errorType == ErrorTypeExtractor.ErrorType.RESOURCE_EXHAUSTED
+        || errorType == ErrorTypeExtractor.ErrorType.INTERNAL ||
+        (resourceId.isDirectory() && errorType == 
ErrorTypeExtractor.ErrorType.FAILED_PRECONDITION);
+  }
+
+  private void createEmptyObjectInternal(
+      StorageResourceId resourceId, CreateObjectOptions createObjectOptions) 
throws IOException {
+    Map<String, String> rewrittenMetadata = 
encodeMetadata(createObjectOptions.getMetadata());
+
+    List<Storage.BlobTargetOption> blobTargetOptions = new ArrayList<>();
+    blobTargetOptions.add(Storage.BlobTargetOption.disableGzipContent());
+    if (resourceId.hasGenerationId()) {
+      
blobTargetOptions.add(Storage.BlobTargetOption.generationMatch(resourceId.getGenerationId()));
+    } else if (resourceId.isDirectory() || 
!createObjectOptions.isOverwriteExisting()) {
+      blobTargetOptions.add(Storage.BlobTargetOption.doesNotExist());
+    }
+
+    try {
+      // TODO: Set encryption key and related properties
+      storage.create(
+          BlobInfo.newBuilder(BlobId.of(resourceId.getBucketName(), 
resourceId.getObjectName()))
+              .setMetadata(rewrittenMetadata)
+              .setContentEncoding(createObjectOptions.getContentEncoding())
+              .setContentType(createObjectOptions.getContentType())
+              .build(),
+          blobTargetOptions.toArray(new Storage.BlobTargetOption[0]));
+    } catch (StorageException e) {
+      throw new IOException(String.format("Creating empty object %s failed.", 
resourceId), e);
+    }
+  }
+
+  private static Map<String, String> encodeMetadata(Map<String, byte[]> 
metadata) {
+    return Maps.transformValues(metadata, 
GoogleCloudStorage::encodeMetadataValues);
+  }
+
+  private static String encodeMetadataValues(byte[] bytes) {
+    return bytes == null ? null : BaseEncoding.base64().encode(bytes);
+  }
+
+  List<GoogleCloudStorageItemInfo> listDirectoryRecursive(String bucketName, 
String objectName)
+      throws IOException {
+    // TODO: Take delimiter from config
+    // TODO: Set specific fields
+
+    try {
+      Page<Blob> blobs = storage.list(
+          bucketName,
+          Storage.BlobListOption.prefix(objectName));
+
+      List<GoogleCloudStorageItemInfo> result = new ArrayList<>();
+      for (Blob blob : blobs.iterateAll()) {
+        result.add(createItemInfoForBlob(blob));
+      }
+
+      return result;
+    } catch (StorageException e) {
+      throw new IOException(
+          String.format("Listing '%s' failed", BlobId.of(bucketName, 
objectName)), e);
+    }
+  }
+
+  void deleteObjects(List<StorageResourceId> fullObjectNames) throws 
IOException {
+    LOG.trace("deleteObjects({})", fullObjectNames);
+
+    if (fullObjectNames.isEmpty()) {
+      return;
+    }
+
+    // Validate that all the elements represent StorageObjects.
+    for (StorageResourceId toDelete : fullObjectNames) {
+      checkArgument(
+          toDelete.isStorageObject(),
+          "Expected full StorageObject names only, got: %s",
+          toDelete);
+    }
+
+    // TODO: Do this concurrently
+    // TODO: There is duplication. fix it
+    for (StorageResourceId toDelete : fullObjectNames) {
+      try {
+        LOG.trace("Deleting Object ({})", toDelete);
+        if (toDelete.hasGenerationId() && toDelete.getGenerationId() != 0) {
+          storage.delete(
+              BlobId.of(toDelete.getBucketName(), toDelete.getObjectName()),
+              
Storage.BlobSourceOption.generationMatch(toDelete.getGenerationId()));
+        } else {
+          // TODO: Remove delete without generationId
+          storage.delete(BlobId.of(toDelete.getBucketName(), 
toDelete.getObjectName()));
+
+          LOG.trace("Deleting Object without generationId ({})", toDelete);
+        }
+      } catch (StorageException e) {
+        throw new IOException(String.format("Deleting resource %s failed.", 
toDelete), e);
+      }
+    }
+  }
+
+  List<GoogleCloudStorageItemInfo> listBucketInfo() throws IOException {
+    List<Bucket> allBuckets = listBucketsInternal();
+    List<GoogleCloudStorageItemInfo> bucketInfos = new 
ArrayList<>(allBuckets.size());
+    for (Bucket bucket : allBuckets) {
+      bucketInfos.add(createItemInfoForBucket(new 
StorageResourceId(bucket.getName()), bucket));
+    }
+    return bucketInfos;
+  }
+
+
+  private List<Bucket> listBucketsInternal() throws IOException {
+    checkNotNull(configuration.getProjectId(), "projectId must not be null");
+    List<Bucket> allBuckets = new ArrayList<>();
+    try {
+      Page<Bucket> buckets =
+          storage.list(
+              
Storage.BucketListOption.pageSize(configuration.getMaxListItemsPerCall()),
+              Storage.BucketListOption.fields(
+                  Storage.BucketField.LOCATION,
+                  Storage.BucketField.STORAGE_CLASS,
+                  Storage.BucketField.TIME_CREATED,
+                  Storage.BucketField.UPDATED));
+
+      // Loop to fetch all the items.
+      for (Bucket bucket : buckets.iterateAll()) {
+        allBuckets.add(bucket);
+      }
+    } catch (StorageException e) {
+      throw new IOException(e);
+    }
+    return allBuckets;
+  }
+
+  // Helper class to capture the results of list operation.
+  private class ListOperationResult {
+    private final Map<String, Blob> prefixes = new HashMap<>();
+    private final List<Blob> objects = new ArrayList<>();
+
+    private  final Set<String> objectsSet = new HashSet<>();
+
+    private final long maxResults;
+
+    ListOperationResult(long maxResults) {
+      this.maxResults = maxResults;
+    }
+
+    void add(Blob blob) {
+      String path = blob.getBlobId().toGsUtilUri();
+      if (blob.getGeneration() != null) {
+        prefixes.remove(path);
+        objects.add(blob);
+
+        objectsSet.add(path);
+      } else if (!objectsSet.contains(path)) {
+        prefixes.put(path, blob);
+      }
+    }
+
+    List<GoogleCloudStorageItemInfo> getItems() {
+      List<GoogleCloudStorageItemInfo> result = new 
ArrayList<>(prefixes.size() + objects.size());
+
+      for (Blob blob : objects) {
+        result.add(createItemInfoForBlob(blob));
+
+        if (result.size() == maxResults) {
+          return result;
+        }
+      }
+
+      for (Blob blob : prefixes.values()) {
+        if (result.size() == maxResults) {
+          return result;
+        }
+
+        result.add(createItemInfoForBlob(blob));
+      }
+
+      return result;
+    }
+  }
 }
diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageFileSystem.java
 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageFileSystem.java
index e411f22eb39..aa1617e4da6 100644
--- 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageFileSystem.java
+++ 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageFileSystem.java
@@ -19,21 +19,57 @@
 package org.apache.hadoop.fs.gs;
 
 import static 
org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.*;
+import static java.util.Comparator.comparing;
+import static org.apache.hadoop.fs.gs.Constants.PATH_DELIMITER;
 import static org.apache.hadoop.fs.gs.Constants.SCHEME;
 
 import com.google.auth.Credentials;
+import 
org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
+import org.apache.hadoop.thirdparty.com.google.common.collect.Iterables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
 import java.nio.channels.WritableByteChannel;
+import java.nio.file.DirectoryNotEmptyException;
+import java.nio.file.FileAlreadyExistsException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
 
 /**
  * Provides FS semantics over GCS based on Objects API.
  */
 class GoogleCloudStorageFileSystem {
   private static final Logger LOG = 
LoggerFactory.getLogger(StorageResourceId.class);
+  // Comparator used for sorting paths.
+  //
+  // For some bulk operations, we need to operate on parent directories before
+  // we operate on their children. To achieve this, we sort paths such that
+  // shorter paths appear before longer paths. Also, we sort lexicographically
+  // within paths of the same length (this is not strictly required but helps 
when
+  // debugging/testing).
+  @VisibleForTesting
+  static final Comparator<URI> PATH_COMPARATOR =
+      comparing(
+          URI::toString,
+          (as, bs) ->
+              (as.length() == bs.length())
+                  ? as.compareTo(bs)
+                  : Integer.compare(as.length(), bs.length()));
+
+  static final Comparator<FileInfo> FILE_INFO_PATH_COMPARATOR =
+      comparing(FileInfo::getPath, PATH_COMPARATOR);
+
+  private static final ListObjectOptions GET_FILE_INFO_LIST_OPTIONS =
+      
ListObjectOptions.DEFAULT.builder().setIncludePrefix(true).setMaxResults(1).build();
+
+  private static final ListObjectOptions LIST_FILE_INFO_LIST_OPTIONS =
+      ListObjectOptions.DEFAULT.builder().setIncludePrefix(true).build();
 
   // URI of the root path.
   static final URI GCSROOT = URI.create(SCHEME + ":/");
@@ -86,4 +122,253 @@ void close() {
       gcs = null;
     }
   }
+
+  public FileInfo getFileInfo(URI path) throws IOException {
+    checkArgument(path != null, "path must not be null");
+    // Validate the given path. true == allow empty object name.
+    // One should be able to get info about top level directory (== bucket),
+    // therefore we allow object name to be empty.
+    StorageResourceId resourceId = StorageResourceId.fromUriPath(path, true);
+    FileInfo fileInfo =
+        FileInfo.fromItemInfo(
+            getFileInfoInternal(resourceId, /* inferImplicitDirectories= */ 
true));
+    LOG.trace("getFileInfo(path: {}): {}", path, fileInfo);
+    return fileInfo;
+  }
+
+  private GoogleCloudStorageItemInfo getFileInfoInternal(
+      StorageResourceId resourceId,
+      boolean inferImplicitDirectories)
+      throws IOException {
+    if (resourceId.isRoot() || resourceId.isBucket()) {
+      return gcs.getItemInfo(resourceId);
+    }
+
+    StorageResourceId dirId = resourceId.toDirectoryId();
+    if (!resourceId.isDirectory()) {
+      GoogleCloudStorageItemInfo itemInfo = gcs.getItemInfo(resourceId);
+      if (itemInfo.exists()) {
+        return itemInfo;
+      }
+
+      if (inferImplicitDirectories) {
+        // TODO: Set max result
+        List<GoogleCloudStorageItemInfo> listDirResult = gcs.listObjectInfo(
+            resourceId.getBucketName(),
+            resourceId.getObjectName(),
+            GET_FILE_INFO_LIST_OPTIONS);
+        LOG.trace("List for getMetadata returned {}. {}", 
listDirResult.size(), listDirResult);
+        if (!listDirResult.isEmpty()) {
+          LOG.trace("Get metadata for directory returned non empty {}", 
listDirResult);
+          return 
GoogleCloudStorageItemInfo.createInferredDirectory(resourceId.toDirectoryId());
+        }
+      }
+    }
+
+    List<GoogleCloudStorageItemInfo> listDirInfo = 
ImmutableList.of(gcs.getItemInfo(dirId));
+    if (listDirInfo.isEmpty()) {
+      return GoogleCloudStorageItemInfo.createNotFound(resourceId);
+    }
+    checkState(listDirInfo.size() <= 2, "listed more than 2 objects: '%s'", 
listDirInfo);
+    GoogleCloudStorageItemInfo dirInfo = Iterables.get(listDirInfo, /* 
position= */ 0);
+    checkState(
+        dirInfo.getResourceId().equals(dirId) || !inferImplicitDirectories,
+        "listed wrong object '%s', but should be '%s'",
+        dirInfo.getResourceId(),
+        resourceId);
+    return dirInfo.getResourceId().equals(dirId) && dirInfo.exists()
+        ? dirInfo
+        : GoogleCloudStorageItemInfo.createNotFound(resourceId);
+  }
+
+  public void mkdirs(URI path) throws IOException {
+    LOG.trace("mkdirs(path: {})", path);
+    checkNotNull(path, "path should not be null");
+
+    /* allowEmptyObjectName= */
+    StorageResourceId resourceId =
+        StorageResourceId.fromUriPath(path, /* allowEmptyObjectName= */ true);
+    if (resourceId.isRoot()) {
+      // GCS_ROOT directory always exists, no need to go through the rest of 
the method.
+      return;
+    }
+
+    // In case path is a bucket we just attempt to create it without 
additional checks
+    if (resourceId.isBucket()) {
+      try {
+        gcs.createBucket(resourceId.getBucketName(), 
CreateBucketOptions.DEFAULT);
+      } catch (FileAlreadyExistsException e) {
+        // This means that bucket already exist, and we do not need to do 
anything.
+        LOG.trace("mkdirs: {} already exists, ignoring creation failure", 
resourceId, e);
+      }
+      return;
+    }
+
+    resourceId = resourceId.toDirectoryId();
+
+    // TODO: Before creating a leaf directory we need to check if there are no 
conflicting files
+    // TODO: with the same name as any subdirectory
+
+    // Create only a leaf directory because subdirectories will be inferred
+    // if leaf directory exists
+    try {
+      gcs.createEmptyObject(resourceId);
+    } catch (FileAlreadyExistsException e) {
+      // This means that directory object already exist, and we do not need to 
do anything.
+      LOG.trace("mkdirs: {} already exists, ignoring creation failure", 
resourceId, e);
+    }
+  }
+
+  void delete(URI path, boolean recursive) throws IOException {
+    checkNotNull(path, "path should not be null");
+    checkArgument(!path.equals(GCSROOT), "Cannot delete root path (%s)", path);
+
+    FileInfo fileInfo = getFileInfo(path);
+    if (!fileInfo.exists()) {
+      throw new FileNotFoundException("Item not found: " + path);
+    }
+
+    List<FileInfo> itemsToDelete;
+    // Delete sub-items if it is a directory.
+    if (fileInfo.isDirectory()) {
+      itemsToDelete =
+          recursive
+              ? listRecursive(fileInfo.getPath()) // TODO: Get only one result
+              : listDirectory(fileInfo.getPath());
+
+      if (!itemsToDelete.isEmpty() && !recursive) {
+        throw new DirectoryNotEmptyException("Cannot delete a non-empty 
directory. : " + path);
+      }
+    } else {
+      itemsToDelete = new ArrayList<>();
+    }
+
+    List<FileInfo> bucketsToDelete = new ArrayList<>();
+    (fileInfo.getItemInfo().isBucket() ? bucketsToDelete : 
itemsToDelete).add(fileInfo);
+
+    deleteObjects(itemsToDelete, bucketsToDelete);
+
+    StorageResourceId parentId =
+        StorageResourceId.fromUriPath(UriPaths.getParentPath(path), true);
+    GoogleCloudStorageItemInfo parentInfo =
+        getFileInfoInternal(parentId, /* inferImplicitDirectories= */ false);
+
+    StorageResourceId resourceId = parentInfo.getResourceId();
+    if (parentInfo.exists()
+        || resourceId.isRoot()
+        || resourceId.isBucket()
+        || PATH_DELIMITER.equals(resourceId.getObjectName())) {
+      return;
+    }
+
+    // TODO: Keep the repair parent step behind a flag
+    gcs.createEmptyObject(parentId);
+  }
+
+  private List<FileInfo> listRecursive(URI prefix) throws IOException {
+    StorageResourceId prefixId = getPrefixId(prefix);
+    List<GoogleCloudStorageItemInfo> itemInfos =
+        gcs.listDirectoryRecursive(prefixId.getBucketName(), 
prefixId.getObjectName());
+    List<FileInfo> fileInfos = FileInfo.fromItemInfos(itemInfos);
+    fileInfos.sort(FILE_INFO_PATH_COMPARATOR);
+    return fileInfos;
+  }
+
+  private List<FileInfo> listDirectory(URI prefix) throws IOException {
+    StorageResourceId prefixId = getPrefixId(prefix);
+    List<GoogleCloudStorageItemInfo> itemInfos = gcs.listObjectInfo(
+        prefixId.getBucketName(),
+        prefixId.getObjectName(),
+        ListObjectOptions.DEFAULT_FLAT_LIST);
+
+    List<FileInfo> fileInfos = FileInfo.fromItemInfos(itemInfos);
+    fileInfos.sort(FILE_INFO_PATH_COMPARATOR);
+    return fileInfos;
+  }
+
+  private StorageResourceId getPrefixId(URI prefix) {
+    checkNotNull(prefix, "prefix could not be null");
+
+    StorageResourceId prefixId = StorageResourceId.fromUriPath(prefix, true);
+    checkArgument(!prefixId.isRoot(), "prefix must not be global root, got 
'%s'", prefix);
+
+    return prefixId;
+  }
+
+  private void deleteObjects(
+      List<FileInfo> itemsToDelete, List<FileInfo> bucketsToDelete)
+      throws IOException {
+    LOG.trace("deleteInternalWithFolders; fileSize={} bucketSize={}",
+        itemsToDelete.size(), bucketsToDelete.size());
+    deleteObjects(itemsToDelete);
+    deleteBucket(bucketsToDelete);
+  }
+
+  private void deleteObjects(List<FileInfo> itemsToDelete) throws IOException {
+    // Delete children before their parents.
+    //
+    // Note: we modify the input list, which is ok for current usage.
+    // We should make a copy in case that changes in future.
+    itemsToDelete.sort(FILE_INFO_PATH_COMPARATOR.reversed());
+
+    if (!itemsToDelete.isEmpty()) {
+      List<StorageResourceId> objectsToDelete = new 
ArrayList<>(itemsToDelete.size());
+      for (FileInfo fileInfo : itemsToDelete) {
+        if (!fileInfo.isInferredDirectory()) {
+          objectsToDelete.add(
+              new StorageResourceId(
+                  fileInfo.getItemInfo().getBucketName(),
+                  fileInfo.getItemInfo().getObjectName(),
+                  fileInfo.getItemInfo().getContentGeneration()));
+        }
+      }
+
+      gcs.deleteObjects(objectsToDelete);
+    }
+  }
+
+  private void deleteBucket(List<FileInfo> bucketsToDelete) throws IOException 
{
+    if (bucketsToDelete == null || bucketsToDelete.isEmpty()) {
+      return;
+    }
+
+    // TODO: Add support for deleting bucket
+    throw new UnsupportedOperationException("deleteBucket is not supported.");
+  }
+
+  public List<FileInfo> listFileInfo(URI path, ListFileOptions listOptions) 
throws IOException {
+    checkNotNull(path, "path can not be null");
+    LOG.trace("listStatus(path: {})", path);
+
+    StorageResourceId pathId =
+        StorageResourceId.fromUriPath(path, /* allowEmptyObjectName= */ true);
+
+    if (!pathId.isDirectory()) {
+      GoogleCloudStorageItemInfo pathInfo = gcs.getItemInfo(pathId);
+      if (pathInfo.exists()) {
+        List<FileInfo> listedInfo = new ArrayList<>();
+        listedInfo.add(FileInfo.fromItemInfo(pathInfo));
+
+        return listedInfo;
+      }
+    }
+
+    StorageResourceId dirId = pathId.toDirectoryId();
+    List<GoogleCloudStorageItemInfo> dirItemInfos = dirId.isRoot() ?
+        gcs.listBucketInfo() :
+        gcs.listObjectInfo(
+            dirId.getBucketName(), dirId.getObjectName(), 
LIST_FILE_INFO_LIST_OPTIONS);
+
+    if (pathId.isStorageObject() && dirItemInfos.isEmpty()) {
+      throw new FileNotFoundException("Item not found: " + path);
+    }
+
+    if (!dirItemInfos.isEmpty() && 
Objects.equals(dirItemInfos.get(0).getResourceId(), dirId)) {
+      dirItemInfos.remove(0);
+    }
+
+    List<FileInfo> fileInfos = FileInfo.fromItemInfos(dirItemInfos);
+    fileInfos.sort(FILE_INFO_PATH_COMPARATOR);
+    return fileInfos;
+  }
 }
diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageItemInfo.java
 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageItemInfo.java
index 887e68b05f9..83169b8d921 100644
--- 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageItemInfo.java
+++ 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageItemInfo.java
@@ -1,11 +1,13 @@
 /*
- * Copyright 2013 Google Inc.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystem.java
 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystem.java
index 1c2fc19d2b5..8831568a356 100644
--- 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystem.java
+++ 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystem.java
@@ -32,12 +32,16 @@
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
+import java.nio.file.DirectoryNotEmptyException;
+import java.util.ArrayList;
 import java.util.EnumSet;
+import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.security.ProviderUtils;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Progressable;
 
 import org.slf4j.Logger;
@@ -273,7 +277,6 @@ public FSDataOutputStream create(Path hadoopPath, 
FsPermission permission, boole
     checkArgument(replication > 0, "replication must be a positive integer: 
%s", replication);
     checkArgument(blockSize > 0, "blockSize must be a positive integer: %s", 
blockSize);
 
-    System.out.println(String.format("create(%s)", hadoopPath));
     checkOpen();
 
     LOG.trace("create(hadoopPath: {}, overwrite: {}, bufferSize: {} 
[ignored])", hadoopPath,
@@ -289,10 +292,32 @@ public FSDataOutputStream create(Path hadoopPath, 
FsPermission permission, boole
   }
 
   @Override
-  public FSDataOutputStream createNonRecursive(Path hadoopPath, FsPermission 
permission,
-      EnumSet<CreateFlag> flags, int bufferSize, short replication, long 
blockSize,
-      Progressable progress) throws IOException {
-    throw new UnsupportedOperationException(hadoopPath.toString());
+  public FSDataOutputStream createNonRecursive(
+      Path hadoopPath,
+      FsPermission permission,
+      EnumSet<CreateFlag> flags,
+      int bufferSize,
+      short replication,
+      long blockSize,
+      Progressable progress)
+      throws IOException {
+    URI gcsPath = getGcsPath(checkNotNull(hadoopPath, "hadoopPath must not be 
null"));
+    URI parentGcsPath = UriPaths.getParentPath(gcsPath);
+    if (!getGcsFs().getFileInfo(parentGcsPath).exists()) {
+      throw new FileNotFoundException(
+          String.format(
+              "Can not create '%s' file, because parent folder does not exist: 
%s",
+              gcsPath, parentGcsPath));
+    }
+
+    return create(
+        hadoopPath,
+        permission,
+        flags.contains(CreateFlag.OVERWRITE),
+        bufferSize,
+        replication,
+        blockSize,
+        progress);
   }
 
   @Override
@@ -308,19 +333,57 @@ public boolean rename(final Path path, final Path path1) 
throws IOException {
   }
 
   @Override
-  public boolean delete(final Path path, final boolean recursive) throws 
IOException {
-    LOG.trace("delete({}, {})", path, recursive);
-    throw new UnsupportedOperationException(path.toString());
+  public boolean delete(final Path hadoopPath, final boolean recursive) throws 
IOException {
+    LOG.trace("delete({}, {})", hadoopPath, recursive);
+    checkArgument(hadoopPath != null, "hadoopPath must not be null");
+
+    checkOpen();
+
+    URI gcsPath = getGcsPath(hadoopPath);
+    try {
+      getGcsFs().delete(gcsPath, recursive);
+    } catch (DirectoryNotEmptyException e) {
+      throw e;
+    } catch (IOException e) {
+      if (ApiErrorExtractor.INSTANCE.requestFailure(e)) {
+        throw e;
+      }
+      LOG.trace("delete(hadoopPath: {}, recursive: {}): false [failed]", 
hadoopPath, recursive, e);
+      return false;
+    }
+
+    LOG.trace("delete(hadoopPath: %s, recursive: %b): true", hadoopPath, 
recursive);
+    return true;
   }
 
   @Override
-  public FileStatus[] listStatus(final Path path) throws 
FileNotFoundException, IOException {
-    checkArgument(path != null, "hadoopPath must not be null");
+  public FileStatus[] listStatus(final Path hadoopPath) throws IOException {
+    checkArgument(hadoopPath != null, "hadoopPath must not be null");
 
     checkOpen();
 
-    LOG.trace("listStatus(hadoopPath: {})", path);
-    throw new UnsupportedOperationException(path.toString());
+    LOG.trace("listStatus(hadoopPath: {})", hadoopPath);
+
+    URI gcsPath = getGcsPath(hadoopPath);
+    List<FileStatus> status;
+
+    try {
+      List<FileInfo> fileInfos = getGcsFs().listFileInfo(gcsPath, 
ListFileOptions.OBJECTFIELDS);
+      status = new ArrayList<>(fileInfos.size());
+      String userName = getUgiUserName();
+      for (FileInfo fileInfo : fileInfos) {
+        status.add(getFileStatus(fileInfo, userName));
+      }
+    } catch (FileNotFoundException fnfe) {
+      throw (FileNotFoundException)
+          new FileNotFoundException(
+              String.format(
+                  "listStatus(hadoopPath: %s): '%s' does not exist.",
+                  hadoopPath, gcsPath))
+              .initCause(fnfe);
+    }
+
+    return status.toArray(new FileStatus[0]);
   }
 
   /**
@@ -402,18 +465,29 @@ public Path getWorkingDirectory() {
   }
 
   @Override
-  public boolean mkdirs(final Path path, final FsPermission fsPermission) 
throws IOException {
-    LOG.trace("mkdirs({})", path);
-    throw new UnsupportedOperationException(path.toString());
-  }
+  public boolean mkdirs(final Path hadoopPath, final FsPermission permission) 
throws IOException {
+    checkArgument(hadoopPath != null, "hadoopPath must not be null");
 
-//  /**
-//   * Gets the default replication factor.
-//   */
-//  @Override
-//  public short getDefaultReplication() {
-//    return REPLICATION_FACTOR_DEFAULT;
-//  }
+    LOG.trace(
+        "mkdirs(hadoopPath: {}, permission: {}): true", hadoopPath, 
permission);
+
+    checkOpen();
+
+    URI gcsPath = getGcsPath(hadoopPath);
+    try {
+      getGcsFs().mkdirs(gcsPath);
+    } catch (java.nio.file.FileAlreadyExistsException faee) {
+      // Need to convert to the Hadoop flavor of FileAlreadyExistsException.
+      throw (FileAlreadyExistsException)
+          new FileAlreadyExistsException(
+              String.format(
+                  "mkdirs(hadoopPath: %s, permission: %s): failed",
+                  hadoopPath, permission))
+              .initCause(faee);
+    }
+
+    return true;
+  }
 
   @Override
   public FileStatus getFileStatus(final Path path) throws IOException {
@@ -423,9 +497,14 @@ public FileStatus getFileStatus(final Path path) throws 
IOException {
 
     URI gcsPath = getGcsPath(path);
 
-    LOG.trace("getFileStatus(): {}", gcsPath);
-
-    throw new UnsupportedOperationException(path.toString());
+    FileInfo fileInfo = getGcsFs().getFileInfo(gcsPath);
+    if (!fileInfo.exists()) {
+      throw new FileNotFoundException(
+          String.format(
+              "%s not found: %s", fileInfo.isDirectory() ? "Directory" : 
"File", path));
+    }
+    String userName = getUgiUserName();
+    return getFileStatus(fileInfo, userName);
   }
 
   /**
@@ -502,4 +581,29 @@ public void setWorkingDirectory(final Path hadoopPath) {
     workingDirectory = getHadoopPath(gcsPath);
     LOG.trace("setWorkingDirectory(hadoopPath: {}): {}", hadoopPath, 
workingDirectory);
   }
+
+
+  private static String getUgiUserName() throws IOException {
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    return ugi.getShortUserName();
+  }
+
+  private FileStatus getFileStatus(FileInfo fileInfo, String userName) {
+    checkNotNull(fileInfo, "fileInfo should not be null");
+    // GCS does not provide modification time. It only provides creation time.
+    // It works for objects because they are immutable once created.
+    FileStatus status = new FileStatus(
+        fileInfo.getSize(),
+        fileInfo.isDirectory(),
+        REPLICATION_FACTOR_DEFAULT,
+        defaultBlockSize,
+        fileInfo.getModificationTime(),
+        fileInfo.getModificationTime(),
+        reportedPermissions,
+        userName,
+        userName,
+        getHadoopPath(fileInfo.getPath()));
+    LOG.trace("FileStatus(path: {}, userName: {}): {}", fileInfo.getPath(), 
userName, status);
+    return status;
+  }
 }
diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystemConfiguration.java
 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystemConfiguration.java
index 16d940b16f4..a480a72e60b 100644
--- 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystemConfiguration.java
+++ 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystemConfiguration.java
@@ -34,28 +34,29 @@ class GoogleHadoopFileSystemConfiguration {
    * querying the value. Modifying this value allows one to control how many 
mappers are used to
    * process a given file.
    */
-  public static final HadoopConfigurationProperty<Long> BLOCK_SIZE =
+  static final HadoopConfigurationProperty<Long> BLOCK_SIZE =
       new HadoopConfigurationProperty<>("fs.gs.block.size", 64 * 1024 * 1024L);
 
   /**
    * Configuration key for GCS project ID. Default value: none
    */
-  public static final HadoopConfigurationProperty<String> GCS_PROJECT_ID =
+  static final HadoopConfigurationProperty<String> GCS_PROJECT_ID =
       new HadoopConfigurationProperty<>("fs.gs.project.id");
 
   /**
    * Configuration key for initial working directory of a GHFS instance. 
Default value: '/'
    */
-  public static final HadoopConfigurationProperty<String> 
GCS_WORKING_DIRECTORY =
+  static final HadoopConfigurationProperty<String> GCS_WORKING_DIRECTORY =
       new HadoopConfigurationProperty<>("fs.gs.working.dir", "/");
 
   /**
    * Configuration key for setting write buffer size.
    */
-  public static final HadoopConfigurationProperty<Long> 
GCS_OUTPUT_STREAM_BUFFER_SIZE =
+  static final HadoopConfigurationProperty<Long> GCS_OUTPUT_STREAM_BUFFER_SIZE 
=
       new HadoopConfigurationProperty<>("fs.gs.outputstream.buffer.size", 8L * 
1024 * 1024);
 
   private final String workingDirectory;
+  private final String projectId;
 
   public int getOutStreamBufferSize() {
     return outStreamBufferSize;
@@ -67,9 +68,18 @@ public int getOutStreamBufferSize() {
     this.workingDirectory = GCS_WORKING_DIRECTORY.get(config, config::get);
     this.outStreamBufferSize =
         toIntExact(GCS_OUTPUT_STREAM_BUFFER_SIZE.get(config, 
config::getLongBytes));
+    this.projectId = GCS_PROJECT_ID.get(config, config::get);
   }
 
   public String getWorkingDirectory() {
     return this.workingDirectory;
   }
+
+  String getProjectId() {
+    return this.projectId;
+  }
+
+  public long getMaxListItemsPerCall() {
+    return 5000L; //TODO: Make this configurable
+  }
 }
diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/IoExceptionHelper.java
 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/IoExceptionHelper.java
new file mode 100644
index 00000000000..c68a6cac1a1
--- /dev/null
+++ 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/IoExceptionHelper.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.gs;
+
+import java.io.IOError;
+import java.io.IOException;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import javax.net.ssl.SSLException;
+
+/**
+ * Translates exceptions from API calls into higher-level meaning, while 
allowing injectability for
+ * testing how API errors are handled.
+ */
+public final class IoExceptionHelper {
+
+  private IoExceptionHelper() {}
+
+  /**
+   * Determines if a given {@link Throwable} is caused by an IO error.
+   *
+   * <p>Recursively checks {@code getCause()} if outer exception isn't an 
instance of the correct
+   * class.
+   *
+   * @param throwable The {@link Throwable} to check.
+   * @return True if the {@link Throwable} is a result of an IO error.
+   */
+  public static boolean isIoError(Throwable throwable) {
+    if (throwable instanceof IOException || throwable instanceof IOError) {
+      return true;
+    }
+    Throwable cause = throwable.getCause();
+    return cause != null && isIoError(cause);
+  }
+
+  /**
+   * Determines if a given {@link Throwable} is caused by a socket error.
+   *
+   * <p>Recursively checks {@code getCause()} if outer exception isn't an 
instance of the correct
+   * class.
+   *
+   * @param throwable The {@link Throwable} to check.
+   * @return True if the {@link Throwable} is a result of a socket error.
+   */
+  public static boolean isSocketError(Throwable throwable) {
+    if (throwable instanceof SocketException || throwable instanceof 
SocketTimeoutException) {
+      return true;
+    }
+    Throwable cause = throwable.getCause();
+    // Subset of SSL exceptions that are caused by IO errors (e.g. 
SSLHandshakeException due to
+    // unexpected connection closure) is also a socket error.
+    if (throwable instanceof SSLException && cause != null && 
isIoError(cause)) {
+      return true;
+    }
+    return cause != null && isSocketError(cause);
+  }
+
+  /**
+   * Determines if a given {@link IOException} is caused by a timed out read.
+   *
+   * @param e The {@link IOException} to check.
+   * @return True if the {@link IOException} is a result of a read timeout.
+   */
+  public static boolean isReadTimedOut(IOException e) {
+    return e instanceof SocketTimeoutException && 
e.getMessage().equalsIgnoreCase("Read timed out");
+  }
+}
diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ListFileOptions.java
 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ListFileOptions.java
new file mode 100644
index 00000000000..2bc74c6fc21
--- /dev/null
+++ 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ListFileOptions.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.gs;
+
+import javax.annotation.Nonnull;
+
+final class ListFileOptions {
+  static final ListFileOptions OBJECTFIELDS = new 
ListFileOptions("bucket,name,size,updated");
+  private final String fields;
+
+  private ListFileOptions(@Nonnull String fields) {
+    this.fields = fields;
+  }
+
+  String getFields() {
+    return fields;
+  }
+}
diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ListObjectOptions.java
 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ListObjectOptions.java
new file mode 100644
index 00000000000..60ec409b5c7
--- /dev/null
+++ 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ListObjectOptions.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.gs;
+
+import javax.annotation.Nullable;
+
+import static org.apache.hadoop.fs.gs.Constants.PATH_DELIMITER;
+
+/** Options that can be specified when listing objects in the {@link 
GoogleCloudStorage}. */
+final class ListObjectOptions {
+
+  /** List all objects in the directory. */
+  public static final ListObjectOptions DEFAULT = new Builder().build();
+
+  /** List all objects with the prefix. */
+  public static final ListObjectOptions DEFAULT_FLAT_LIST =
+      DEFAULT.builder().setDelimiter(null).build();
+
+  Builder builder() {
+    Builder result = new Builder();
+    result.fields = fields;
+    result.delimiter = delimiter;
+    result.maxResults = maxResult;
+    result.includePrefix = includePrefix;
+
+    return result;
+  }
+
+  private final String delimiter;
+  private final boolean includePrefix;
+  private final long maxResult;
+  private final String fields;
+
+  private ListObjectOptions(Builder builder) {
+    this.delimiter = builder.delimiter;
+    this.includePrefix = builder.includePrefix;
+    this.maxResult = builder.maxResults;
+    this.fields = builder.fields;
+  }
+
+  /** Delimiter to use (typically {@code /}), otherwise {@code null}. */
+  @Nullable
+  String getDelimiter() {
+    return delimiter;
+  }
+
+  /** Whether to include prefix object in the result. */
+  boolean isIncludePrefix() {
+    return includePrefix;
+  }
+
+  /** Maximum number of results to return, unlimited if negative or zero. */
+  long getMaxResults() {
+    return maxResult;
+  }
+
+  /**
+   * Comma separated list of object fields to include in the list response.
+   *
+   * <p>See <a
+   * 
href="https://cloud.google.com/storage/docs/json_api/v1/objects#resource-representations";>
+   * object resource</a> for reference.
+   */
+  @Nullable
+  String getFields() {
+    return fields;
+  }
+
+  static class Builder {
+    private static final int MAX_RESULTS_UNLIMITED = -1;
+
+    static final String OBJECT_FIELDS =
+            String.join(
+                    /* delimiter= */ ",",
+                    "bucket",
+                    "name",
+                    "timeCreated",
+                    "updated",
+                    "generation",
+                    "metageneration",
+                    "size",
+                    "contentType",
+                    "contentEncoding",
+                    "md5Hash",
+                    "crc32c",
+                    "metadata");
+
+    private String delimiter;
+    private boolean includePrefix;
+
+    private long maxResults;
+
+    private String fields;
+
+    Builder() {
+      this.delimiter = PATH_DELIMITER;
+      this.includePrefix = false;
+      this.maxResults = MAX_RESULTS_UNLIMITED;
+      this.fields = OBJECT_FIELDS;
+    }
+    public Builder setDelimiter(String d) {
+      this.delimiter = d;
+      return this;
+    }
+
+    public Builder setIncludePrefix(boolean value) {
+      this.includePrefix = value;
+      return this;
+    }
+
+    public Builder setMaxResults(long mr) {
+      this.maxResults = mr;
+      return this;
+    }
+
+    public Builder setFields(String f) {
+      this.fields = f;
+      return this;
+    }
+
+    public ListObjectOptions build() {
+      return new ListObjectOptions(this);
+    }
+  }
+}
diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/VerificationAttributes.java
 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/VerificationAttributes.java
index 4155482fc7d..03de0a52e37 100644
--- 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/VerificationAttributes.java
+++ 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/VerificationAttributes.java
@@ -1,11 +1,13 @@
 /*
- * Copyright 2016 Google Inc.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
diff --git 
a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/TestConfiguration.java
 
b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/TestConfiguration.java
new file mode 100644
index 00000000000..f205276d372
--- /dev/null
+++ 
b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/TestConfiguration.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.gs;
+
+/** Access to test configurations values. */
+public abstract class TestConfiguration {
+  public static final String GCS_TEST_PROJECT_ID = "GCS_TEST_PROJECT_ID";
+  public static final String GCS_TEST_JSON_KEYFILE = "GCS_TEST_JSON_KEYFILE";
+
+  public static final String GCS_TEST_DIRECT_PATH_PREFERRED = 
"GCS_TEST_DIRECT_PATH_PREFERRED";
+
+  /** Environment-based test configuration. */
+  public static class EnvironmentBasedTestConfiguration extends 
TestConfiguration {
+    @Override
+    public String getProjectId() {
+      return System.getenv(GCS_TEST_PROJECT_ID);
+    }
+
+    @Override
+    public String getServiceAccountJsonKeyFile() {
+      return System.getenv(GCS_TEST_JSON_KEYFILE);
+    }
+
+    @Override
+    public boolean isDirectPathPreferred() {
+      String envVar = System.getenv(GCS_TEST_DIRECT_PATH_PREFERRED);
+      // if env variable is not configured default behaviour is to attempt 
directPath
+      if (envVar == null) {
+        return true;
+      }
+      return Boolean.parseBoolean(envVar);
+    }
+  }
+
+  public static TestConfiguration getInstance() {
+    return LazyHolder.INSTANCE;
+  }
+
+  private static class LazyHolder {
+    private static final TestConfiguration INSTANCE = new 
EnvironmentBasedTestConfiguration();
+  }
+
+  public abstract String getProjectId();
+
+  public abstract String getServiceAccountJsonKeyFile();
+
+  public abstract boolean isDirectPathPreferred();
+}
diff --git 
a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/TestStorageResourceId.java
 
b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/TestStorageResourceId.java
index e0a39b2d7e4..e027c7b4091 100644
--- 
a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/TestStorageResourceId.java
+++ 
b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/TestStorageResourceId.java
@@ -1,11 +1,13 @@
 /*
- * Copyright 2013 Google Inc.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
diff --git 
a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/TestStringPaths.java
 
b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/TestStringPaths.java
index 16234e0ce1d..a6b64ff7cff 100644
--- 
a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/TestStringPaths.java
+++ 
b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/TestStringPaths.java
@@ -1,11 +1,13 @@
 /*
- * Copyright 2013 Google Inc.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
diff --git 
a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/TestUriPaths.java
 
b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/TestUriPaths.java
index fe93a28dc43..0325df52f9b 100644
--- 
a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/TestUriPaths.java
+++ 
b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/TestUriPaths.java
@@ -1,11 +1,13 @@
 /*
- * Copyright 2013 Google Inc.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
diff --git 
a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/GoogleContract.java
 
b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/GoogleContract.java
new file mode 100644
index 00000000000..aa131981caf
--- /dev/null
+++ 
b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/GoogleContract.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.gs.contract;
+
+import org.apache.hadoop.fs.gs.TestConfiguration;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractBondedFSContract;
+
+/** Contract of GoogleHadoopFileSystem via scheme "gs". */
+public class GoogleContract extends AbstractBondedFSContract {
+  private static final String CONTRACT_XML = "contract/gs.xml";
+
+  public GoogleContract(Configuration conf) {
+    super(conf);
+    addConfResource(CONTRACT_XML);
+    conf.set("fs.contract.test.fs.gs", "gs://arunchacko-oss-test-bucket"); // 
TODO:
+
+    TestConfiguration testConf = TestConfiguration.getInstance();
+    if (testConf.getProjectId() != null) {
+      conf.set("fs.gs.project.id", testConf.getProjectId());
+    }
+  }
+
+  @Override
+  public String getScheme() {
+    return "gs";
+  }
+}
diff --git 
a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractDelete.java
 
b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractDelete.java
new file mode 100644
index 00000000000..7ed3834025c
--- /dev/null
+++ 
b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractDelete.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.gs.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractDeleteTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+
+public class ITestGoogleContractDelete extends AbstractContractDeleteTest {
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new GoogleContract(conf);
+  }
+
+  @Override
+  public void testDeleteEmptyDirNonRecursive() {
+    // TODO: Enable this
+    ContractTestUtils.skip("Skipping the test. This will be enabled in a 
subsequent change");
+  }
+}
diff --git 
a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractGetFileStatus.java
 
b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractGetFileStatus.java
new file mode 100644
index 00000000000..aae16c2a410
--- /dev/null
+++ 
b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractGetFileStatus.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.gs.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractGetFileStatusTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+public class ITestGoogleContractGetFileStatus extends 
AbstractContractGetFileStatusTest {
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new GoogleContract(conf);
+  }
+}
diff --git 
a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractMkdir.java
 
b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractMkdir.java
new file mode 100644
index 00000000000..26181f20385
--- /dev/null
+++ 
b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractMkdir.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.gs.contract;
+
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractMkdirTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+public class ITestGoogleContractMkdir extends AbstractContractMkdirTest {
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new GoogleContract(conf);
+  }
+
+  @Override
+  public void testMkdirsDoesNotRemoveParentDirectories() {
+    // TODO: Enable this
+    ContractTestUtils.skip("Skipping the test. This will be enabled in a 
subsequent change");
+  }
+
+  @Override
+  public void testCreateDirWithExistingDir() {
+    // TODO: Enable this
+    ContractTestUtils.skip("Skipping the test. This will be enabled in a 
subsequent change");
+  }
+
+  @Override
+  public void testMkDirRmDir() {
+    // TODO: Enable this
+    ContractTestUtils.skip("Skipping the test. This will be enabled in a 
subsequent change");
+  }
+
+  @Override
+  public void testNoMkdirOverFile() {
+    // TODO: Enable this
+    ContractTestUtils.skip("Skipping the test. This will be enabled in a 
subsequent change");
+  }
+
+  @Override
+  public void testMkdirOverParentFile() {
+    // TODO: Enable this
+    ContractTestUtils.skip("Skipping the test. This will be enabled in a 
subsequent change");
+  }
+}
diff --git 
a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/package-info.java
 
b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/package-info.java
new file mode 100644
index 00000000000..8806dc9f45b
--- /dev/null
+++ 
b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Google Cloud Storage Filesystem contract tests.
+ */
+package org.apache.hadoop.fs.gs.contract;
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to