This is an automated email from the ASF dual-hosted git repository. cnauroth pushed a commit to branch HADOOP-19343 in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit 13614c3bd3ee56f2979ce8a286f8cfe51c156e14 Author: Arunkumar Chacko <aruncha...@google.com> AuthorDate: Tue Jun 10 04:53:19 2025 +0000 HADOOP-19343: Add support for mkdir() and getFileStatus() Closes #7721 Signed-off-by: Chris Nauroth <cnaur...@apache.org> --- hadoop-project/pom.xml | 2 +- hadoop-tools/hadoop-gcp/pom.xml | 11 + .../org/apache/hadoop/fs/gs/ApiErrorExtractor.java | 327 +++++++++++++++++ .../apache/hadoop/fs/gs/CreateBucketOptions.java | 81 +++++ .../apache/hadoop/fs/gs/CreateObjectOptions.java | 127 +++++++ .../apache/hadoop/fs/gs/ErrorTypeExtractor.java | 47 ++- .../java/org/apache/hadoop/fs/gs/FileInfo.java | 14 +- .../apache/hadoop/fs/gs/GoogleCloudStorage.java | 385 ++++++++++++++++++++- .../hadoop/fs/gs/GoogleCloudStorageFileSystem.java | 285 +++++++++++++++ .../hadoop/fs/gs/GoogleCloudStorageItemInfo.java | 14 +- .../hadoop/fs/gs/GoogleHadoopFileSystem.java | 156 +++++++-- .../fs/gs/GoogleHadoopFileSystemConfiguration.java | 18 +- .../org/apache/hadoop/fs/gs/IoExceptionHelper.java | 83 +++++ .../org/apache/hadoop/fs/gs/ListFileOptions.java | 34 ++ .../org/apache/hadoop/fs/gs/ListObjectOptions.java | 141 ++++++++ .../hadoop/fs/gs/VerificationAttributes.java | 14 +- .../org/apache/hadoop/fs/gs/TestConfiguration.java | 64 ++++ .../apache/hadoop/fs/gs/TestStorageResourceId.java | 14 +- .../org/apache/hadoop/fs/gs/TestStringPaths.java | 14 +- .../java/org/apache/hadoop/fs/gs/TestUriPaths.java | 14 +- .../hadoop/fs/gs/contract/GoogleContract.java | 44 +++ .../fs/gs/contract/ITestGoogleContractDelete.java | 37 ++ .../contract/ITestGoogleContractGetFileStatus.java | 30 ++ .../fs/gs/contract/ITestGoogleContractMkdir.java | 61 ++++ .../apache/hadoop/fs/gs/contract/package-info.java | 22 ++ 25 files changed, 1959 insertions(+), 80 deletions(-) diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index 3e0cb4dfbf9..1cb6b157ccd 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -2156,7 +2156,7 @@ <dependency> <groupId>com.google.cloud</groupId> <artifactId>google-cloud-storage</artifactId> - <version>2.44.1</version> + <version>2.52.0</version> </dependency> </dependencies> </dependencyManagement> diff --git a/hadoop-tools/hadoop-gcp/pom.xml b/hadoop-tools/hadoop-gcp/pom.xml index d5744f1f97c..2da2881ab79 100644 --- a/hadoop-tools/hadoop-gcp/pom.xml +++ b/hadoop-tools/hadoop-gcp/pom.xml @@ -261,6 +261,10 @@ <include>com.lmax</include> <include>io.grpc</include> <include>io.opencensus</include> + <include>io.opentelemetry</include> + <include>io.opentelemetry.api</include> + <include>io.opentelemetry.contrib</include> + <include>io.opentelemetry.semconv</include> <include>io.perfmark</include> <include>org.apache.httpcomponents</include> <include>org.threeten:threetenbp</include> @@ -282,6 +286,7 @@ <include>com.google.cloud.hadoop.util.**</include> <include>com.google.cloud.http.**</include> <include>com.google.cloud.monitoring.**</include> + <include>com.google.cloud.opentelemetry.**</include> <include>com.google.cloud.spi.**</include> <include>com.google.cloud.storage.**</include> <include>com.google.common.**</include> @@ -459,6 +464,12 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <scope>test</scope> + <type>test-jar</type> + </dependency> <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> diff --git a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ApiErrorExtractor.java b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ApiErrorExtractor.java new file mode 100644 index 00000000000..4fef41b1971 --- /dev/null +++ b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ApiErrorExtractor.java @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.gs; + +import com.google.api.client.googleapis.json.GoogleJsonError; +import com.google.api.client.googleapis.json.GoogleJsonError.ErrorInfo; +import com.google.api.client.googleapis.json.GoogleJsonResponseException; +import com.google.api.client.http.HttpResponseException; +import com.google.api.client.http.HttpStatusCodes; +import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList; +import org.apache.hadoop.thirdparty.com.google.common.collect.Iterables; +import java.io.IOException; +import java.util.List; +import javax.annotation.Nullable; + +/** + * Translates exceptions from API calls into higher-level meaning, while allowing injectability for + * testing how API errors are handled. + */ +class ApiErrorExtractor { + + /** Singleton instance of the ApiErrorExtractor. */ + public static final ApiErrorExtractor INSTANCE = new ApiErrorExtractor(); + + public static final int STATUS_CODE_RANGE_NOT_SATISFIABLE = 416; + + public static final String GLOBAL_DOMAIN = "global"; + public static final String USAGE_LIMITS_DOMAIN = "usageLimits"; + + public static final String RATE_LIMITED_REASON = "rateLimitExceeded"; + public static final String USER_RATE_LIMITED_REASON = "userRateLimitExceeded"; + + public static final String QUOTA_EXCEEDED_REASON = "quotaExceeded"; + + // These come with "The account for ... has been disabled" message. + public static final String ACCOUNT_DISABLED_REASON = "accountDisabled"; + + // These come with "Project marked for deletion" message. + public static final String ACCESS_NOT_CONFIGURED_REASON = "accessNotConfigured"; + + // These are 400 error codes with "resource 'xyz' is not ready" message. + // These sometimes happens when create operation is still in-flight but resource + // representation is already available via get call. + public static final String RESOURCE_NOT_READY_REASON = "resourceNotReady"; + + // HTTP 413 with message "Value for field 'foo' is too large". + public static final String FIELD_SIZE_TOO_LARGE_REASON = "fieldSizeTooLarge"; + + // HTTP 400 message for 'USER_PROJECT_MISSING' error. + public static final String USER_PROJECT_MISSING_MESSAGE = + "Bucket is a requester pays bucket but no user project provided."; + + // The debugInfo field present on Errors collection in GoogleJsonException + // as an unknown key. + private static final String DEBUG_INFO_FIELD = "debugInfo"; + + /** + * Determines if the given exception indicates intermittent request failure or failure caused by + * user error. + */ + public boolean requestFailure(IOException e) { + HttpResponseException httpException = getHttpResponseException(e); + return httpException != null + && (accessDenied(httpException) + || badRequest(httpException) + || internalServerError(httpException) + || rateLimited(httpException) + || IoExceptionHelper.isSocketError(httpException) + || unauthorized(httpException)); + } + + /** + * Determines if the given exception indicates 'access denied'. Recursively checks getCause() if + * outer exception isn't an instance of the correct class. + * + * <p>Warning: this method only checks for access denied status code, however this may include + * potentially recoverable reason codes such as rate limiting. For alternative, see {@link + * #accessDeniedNonRecoverable(IOException)}. + */ + public boolean accessDenied(IOException e) { + return recursiveCheckForCode(e, HttpStatusCodes.STATUS_CODE_FORBIDDEN); + } + + /** Determines if the given exception indicates bad request. */ + public boolean badRequest(IOException e) { + return recursiveCheckForCode(e, HttpStatusCodes.STATUS_CODE_BAD_REQUEST); + } + + /** + * Determines if the given exception indicates the request was unauthenticated. This can be caused + * by attaching invalid credentials to a request. + */ + public boolean unauthorized(IOException e) { + return recursiveCheckForCode(e, HttpStatusCodes.STATUS_CODE_UNAUTHORIZED); + } + + /** + * Determines if the exception is a non-recoverable access denied code (such as account closed or + * marked for deletion). + */ + public boolean accessDeniedNonRecoverable(IOException e) { + ErrorInfo errorInfo = getErrorInfo(e); + String reason = errorInfo != null ? errorInfo.getReason() : null; + return ACCOUNT_DISABLED_REASON.equals(reason) || ACCESS_NOT_CONFIGURED_REASON.equals(reason); + } + + /** Determines if the exception is a client error. */ + public boolean clientError(IOException e) { + HttpResponseException httpException = getHttpResponseException(e); + return httpException != null && getHttpStatusCode(httpException) / 100 == 4; + } + + /** Determines if the exception is an internal server error. */ + public boolean internalServerError(IOException e) { + HttpResponseException httpException = getHttpResponseException(e); + return httpException != null && getHttpStatusCode(httpException) / 100 == 5; + } + + /** + * Determines if the given exception indicates 'item already exists'. Recursively checks + * getCause() if outer exception isn't an instance of the correct class. + */ + public boolean itemAlreadyExists(IOException e) { + return recursiveCheckForCode(e, HttpStatusCodes.STATUS_CODE_CONFLICT); + } + + /** + * Determines if the given exception indicates 'item not found'. Recursively checks getCause() if + * outer exception isn't an instance of the correct class. + */ + public boolean itemNotFound(IOException e) { + return recursiveCheckForCode(e, HttpStatusCodes.STATUS_CODE_NOT_FOUND); + } + + /** + * Determines if the given exception indicates 'field size too large'. Recursively checks + * getCause() if outer exception isn't an instance of the correct class. + */ + public boolean fieldSizeTooLarge(IOException e) { + ErrorInfo errorInfo = getErrorInfo(e); + return errorInfo != null && FIELD_SIZE_TOO_LARGE_REASON.equals(errorInfo.getReason()); + } + + /** + * Determines if the given exception indicates 'resource not ready'. Recursively checks getCause() + * if outer exception isn't an instance of the correct class. + */ + public boolean resourceNotReady(IOException e) { + ErrorInfo errorInfo = getErrorInfo(e); + return errorInfo != null && RESOURCE_NOT_READY_REASON.equals(errorInfo.getReason()); + } + + /** + * Determines if the given IOException indicates 'precondition not met' Recursively checks + * getCause() if outer exception isn't an instance of the correct class. + */ + public boolean preconditionNotMet(IOException e) { + return recursiveCheckForCode(e, HttpStatusCodes.STATUS_CODE_PRECONDITION_FAILED); + } + + /** + * Determines if the given exception indicates 'range not satisfiable'. Recursively checks + * getCause() if outer exception isn't an instance of the correct class. + */ + public boolean rangeNotSatisfiable(IOException e) { + return recursiveCheckForCode(e, STATUS_CODE_RANGE_NOT_SATISFIABLE); + } + + /** + * Determines if a given Throwable is caused by a rate limit being applied. Recursively checks + * getCause() if outer exception isn't an instance of the correct class. + * + * @param e The Throwable to check. + * @return True if the Throwable is a result of rate limiting being applied. + */ + public boolean rateLimited(IOException e) { + ErrorInfo errorInfo = getErrorInfo(e); + if (errorInfo != null) { + String domain = errorInfo.getDomain(); + boolean isRateLimitedOrGlobalDomain = + USAGE_LIMITS_DOMAIN.equals(domain) || GLOBAL_DOMAIN.equals(domain); + String reason = errorInfo.getReason(); + boolean isRateLimitedReason = + RATE_LIMITED_REASON.equals(reason) || USER_RATE_LIMITED_REASON.equals(reason); + return isRateLimitedOrGlobalDomain && isRateLimitedReason; + } + return false; + } + + /** + * Determines if a given Throwable is caused by Quota Exceeded. Recursively checks getCause() if + * outer exception isn't an instance of the correct class. + */ + public boolean quotaExceeded(IOException e) { + ErrorInfo errorInfo = getErrorInfo(e); + return errorInfo != null && QUOTA_EXCEEDED_REASON.equals(errorInfo.getReason()); + } + + /** + * Determines if the given exception indicates that 'userProject' is missing in request. + * Recursively checks getCause() if outer exception isn't an instance of the correct class. + */ + public boolean userProjectMissing(IOException e) { + GoogleJsonError jsonError = getJsonError(e); + return jsonError != null + && jsonError.getCode() == HttpStatusCodes.STATUS_CODE_BAD_REQUEST + && USER_PROJECT_MISSING_MESSAGE.equals(jsonError.getMessage()); + } + + /** Extracts the error message. */ + public String getErrorMessage(IOException e) { + // Prefer to use message from GJRE. + GoogleJsonError jsonError = getJsonError(e); + return jsonError == null ? e.getMessage() : jsonError.getMessage(); + } + + /** + * Converts the exception to a user-presentable error message. Specifically, extracts message + * field for HTTP 4xx codes, and creates a generic "Internal Server Error" for HTTP 5xx codes. + * + * @param e the exception + * @param action the description of the action being performed at the time of error. + * @see #toUserPresentableMessage(IOException, String) + */ + public IOException toUserPresentableException(IOException e, String action) throws IOException { + throw new IOException(toUserPresentableMessage(e, action), e); + } + + /** + * Converts the exception to a user-presentable error message. Specifically, extracts message + * field for HTTP 4xx codes, and creates a generic "Internal Server Error" for HTTP 5xx codes. + */ + public String toUserPresentableMessage(IOException e, @Nullable String action) { + String message = "Internal server error"; + if (clientError(e)) { + message = getErrorMessage(e); + } + return action == null + ? message + : String.format("Encountered an error while %s: %s", action, message); + } + + /** See {@link #toUserPresentableMessage(IOException, String)}. */ + public String toUserPresentableMessage(IOException e) { + return toUserPresentableMessage(e, null); + } + + @Nullable + public String getDebugInfo(IOException e) { + ErrorInfo errorInfo = getErrorInfo(e); + return errorInfo != null ? (String) errorInfo.getUnknownKeys().get(DEBUG_INFO_FIELD) : null; + } + + /** + * Returns HTTP status code from the given exception. + * + * <p>Note: GoogleJsonResponseException.getStatusCode() method is marked final therefore it cannot + * be mocked using Mockito. We use this helper so that we can override it in tests. + */ + protected int getHttpStatusCode(HttpResponseException e) { + return e.getStatusCode(); + } + + /** + * Get the first ErrorInfo from an IOException if it is an instance of + * GoogleJsonResponseException, otherwise return null. + */ + @Nullable + protected ErrorInfo getErrorInfo(IOException e) { + GoogleJsonError jsonError = getJsonError(e); + List<ErrorInfo> errors = jsonError != null ? jsonError.getErrors() : ImmutableList.of(); + return errors != null ? Iterables.getFirst(errors, null) : null; + } + + /** If the exception is a GoogleJsonResponseException, get the error details, else return null. */ + @Nullable + protected GoogleJsonError getJsonError(IOException e) { + GoogleJsonResponseException jsonException = getJsonResponseException(e); + return jsonException == null ? null : jsonException.getDetails(); + } + + /** Recursively checks getCause() if outer exception isn't an instance of the correct class. */ + protected boolean recursiveCheckForCode(IOException e, int code) { + HttpResponseException httpException = getHttpResponseException(e); + return httpException != null && getHttpStatusCode(httpException) == code; + } + + @Nullable + public static GoogleJsonResponseException getJsonResponseException(Throwable throwable) { + Throwable cause = throwable; + while (cause != null) { + if (cause instanceof GoogleJsonResponseException) { + return (GoogleJsonResponseException) cause; + } + cause = cause.getCause(); + } + return null; + } + + @Nullable + public static HttpResponseException getHttpResponseException(Throwable throwable) { + Throwable cause = throwable; + while (cause != null) { + if (cause instanceof HttpResponseException) { + return (HttpResponseException) cause; + } + cause = cause.getCause(); + } + return null; + } +} diff --git a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/CreateBucketOptions.java b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/CreateBucketOptions.java new file mode 100644 index 00000000000..46cd2a7efbd --- /dev/null +++ b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/CreateBucketOptions.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.gs; + +import java.time.Duration; + +final class CreateBucketOptions { + // TODO: Make sure the defaults have the setting matching the existing connector. + static final CreateBucketOptions DEFAULT = new Builder().build(); + private final String location; + private final String storageClass; + private final Duration ttl; + private final String projectId; + + private CreateBucketOptions(Builder builder) { + this.location = builder.location; + this.storageClass = builder.storageClass; + this.ttl = builder.ttl; + this.projectId = builder.projectId; + } + + public String getLocation() { + return location; + } + + public String getStorageClass() { + return storageClass; + } + + public Duration getTtl() { // Changed return type to Duration + return ttl; + } + + static class Builder { + private String location; + private String storageClass; + private Duration ttl; + private String projectId; + + public Builder withLocation(String loc) { + this.location = loc; + return this; + } + + public Builder withStorageClass(String sc) { + this.storageClass = sc; + return this; + } + + public Builder withTtl(Duration ttlDuration) { + this.ttl = ttlDuration; + return this; + } + + public Builder withProjectId(String pid) { + this.projectId = pid; + return this; + } + + public CreateBucketOptions build() { + return new CreateBucketOptions(this); + } + } +} + diff --git a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/CreateObjectOptions.java b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/CreateObjectOptions.java new file mode 100644 index 00000000000..26c91fcae7b --- /dev/null +++ b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/CreateObjectOptions.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.gs; + +import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap; + +import java.util.HashMap; +import java.util.Map; + +/** Options that can be specified when creating a file in the {@link GoogleCloudStorage}. */ + +final class CreateObjectOptions { + static final CreateObjectOptions DEFAULT_OVERWRITE = builder().setOverwriteExisting(true).build(); + + private final String contentEncoding; + private final String contentType; + private final boolean ensureEmptyObjectsMetadataMatch; + private final String kmsKeyName; + private final ImmutableMap<String, byte[]> metadata; + private final boolean overwriteExisting; + + private CreateObjectOptions(Builder builder) { + this.contentEncoding = builder.contentEncoding; + this.contentType = builder.contentType; + this.ensureEmptyObjectsMetadataMatch = builder.ensureEmptyObjectsMetadataMatch; + this.kmsKeyName = builder.kmsKeyName; + this.metadata = ImmutableMap.copyOf(builder.metadata); + this.overwriteExisting = builder.overwriteExisting; + } + + public static Builder builder() { + return new Builder(); + } + + public String getContentEncoding() { + return contentEncoding; + } + + public String getContentType() { + return contentType; + } + + public boolean isEnsureEmptyObjectsMetadataMatch() { + return ensureEmptyObjectsMetadataMatch; + } + + public String getKmsKeyName() { + return kmsKeyName; + } + + public Map<String, byte[]> getMetadata() { + return metadata; + } + + public boolean isOverwriteExisting() { + return overwriteExisting; + } + + public Builder toBuilder() { + return builder().setContentEncoding(this.contentEncoding).setContentType(this.contentType) + .setEnsureEmptyObjectsMetadataMatch(this.ensureEmptyObjectsMetadataMatch) + .setKmsKeyName(this.kmsKeyName).setMetadata(this.metadata) + .setOverwriteExisting(this.overwriteExisting); + } + + static final class Builder { + private String contentEncoding; + private String contentType; + private boolean ensureEmptyObjectsMetadataMatch = false; + private String kmsKeyName; + private Map<String, byte[]> metadata = new HashMap<>(); + private boolean overwriteExisting = false; + + private Builder() { + } + + public Builder setContentEncoding(String ce) { + this.contentEncoding = ce; + return this; + } + + public Builder setContentType(String ct) { + this.contentType = ct; + return this; + } + + public Builder setEnsureEmptyObjectsMetadataMatch(boolean val) { + this.ensureEmptyObjectsMetadataMatch = val; + return this; + } + + public Builder setKmsKeyName(String key) { + this.kmsKeyName = key; + return this; + } + + public Builder setMetadata(Map<String, byte[]> m) { + this.metadata = m; + return this; + } + + public Builder setOverwriteExisting(boolean overwrite) { + this.overwriteExisting = overwrite; + return this; + } + + public CreateObjectOptions build() { + return new CreateObjectOptions(this); + } + } +} diff --git a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ErrorTypeExtractor.java b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ErrorTypeExtractor.java index a4497734524..547d855d1d6 100644 --- a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ErrorTypeExtractor.java +++ b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ErrorTypeExtractor.java @@ -1,11 +1,13 @@ /* - * Copyright 2023 Google LLC + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -16,13 +18,46 @@ package org.apache.hadoop.fs.gs; +import javax.annotation.Nullable; + import io.grpc.Status; +import io.grpc.StatusRuntimeException; /** * Implementation for {@link ErrorTypeExtractor} for exception specifically thrown from gRPC path. */ final class ErrorTypeExtractor { + static boolean bucketAlreadyExists(Exception e) { + ErrorType errorType = getErrorType(e); + if (errorType == ErrorType.ALREADY_EXISTS) { + return true; + } else if (errorType == ErrorType.FAILED_PRECONDITION) { + // The gRPC API currently throws a FAILED_PRECONDITION status code instead of ALREADY_EXISTS, + // so we handle both these conditions in the interim. + StatusRuntimeException statusRuntimeException = getStatusRuntimeException(e); + return statusRuntimeException != null + && BUCKET_ALREADY_EXISTS_MESSAGE.equals(statusRuntimeException.getMessage()); + } + return false; + } + + @Nullable + static private StatusRuntimeException getStatusRuntimeException(Exception e) { + Throwable cause = e; + // Keeping a counter to break early from the loop to avoid infinite loop condition due to + // cyclic exception chains. + int currentExceptionDepth = 0, maxChainDepth = 1000; + while (cause != null && currentExceptionDepth < maxChainDepth) { + if (cause instanceof StatusRuntimeException) { + return (StatusRuntimeException) cause; + } + cause = cause.getCause(); + currentExceptionDepth++; + } + return null; + } + enum ErrorType { NOT_FOUND, OUT_OF_RANGE, ALREADY_EXISTS, FAILED_PRECONDITION, INTERNAL, RESOURCE_EXHAUSTED, UNAVAILABLE, UNKNOWN diff --git a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/FileInfo.java b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/FileInfo.java index df8d63f5eec..3b9d9f475ae 100644 --- a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/FileInfo.java +++ b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/FileInfo.java @@ -1,11 +1,13 @@ /* - * Copyright 2013 Google Inc. + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, diff --git a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorage.java b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorage.java index 9c15962b7ef..d68eca6a8a5 100644 --- a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorage.java +++ b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorage.java @@ -20,7 +20,12 @@ import static org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.*; import static org.apache.hadoop.thirdparty.com.google.common.base.Strings.isNullOrEmpty; +import static java.lang.Math.toIntExact; +import com.google.api.client.util.BackOff; +import com.google.api.client.util.ExponentialBackOff; +import com.google.api.client.util.Sleeper; +import com.google.api.gax.paging.Page; import com.google.cloud.storage.*; import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList; import org.apache.hadoop.thirdparty.com.google.common.collect.Maps; @@ -32,21 +37,34 @@ import java.io.IOException; import java.nio.channels.WritableByteChannel; import java.nio.file.FileAlreadyExistsException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; /** * A wrapper around <a href="https://github.com/googleapis/java-storage">Google cloud storage * client</a>. */ class GoogleCloudStorage { - public static final Logger LOG = LoggerFactory.getLogger(GoogleHadoopFileSystem.class); + static final Logger LOG = LoggerFactory.getLogger(GoogleHadoopFileSystem.class); static final List<Storage.BlobField> BLOB_FIELDS = - ImmutableList.of(Storage.BlobField.BUCKET, Storage.BlobField.CONTENT_ENCODING, + ImmutableList.of( + Storage.BlobField.BUCKET, Storage.BlobField.CONTENT_ENCODING, Storage.BlobField.CONTENT_TYPE, Storage.BlobField.CRC32C, Storage.BlobField.GENERATION, Storage.BlobField.METADATA, Storage.BlobField.MD5HASH, Storage.BlobField.METAGENERATION, Storage.BlobField.NAME, Storage.BlobField.SIZE, Storage.BlobField.TIME_CREATED, Storage.BlobField.UPDATED); + + static final CreateObjectOptions EMPTY_OBJECT_CREATE_OPTIONS = + CreateObjectOptions.DEFAULT_OVERWRITE.toBuilder() + .setEnsureEmptyObjectsMetadataMatch(false) + .build(); + private final Storage storage; private final GoogleHadoopFileSystemConfiguration configuration; @@ -55,13 +73,20 @@ class GoogleCloudStorage { * is in WIP. */ GoogleCloudStorage(GoogleHadoopFileSystemConfiguration configuration) throws IOException { - // TODO: Set projectId // TODO: Set credentials - this.storage = StorageOptions.newBuilder().build().getService(); + this.storage = createStorage(configuration.getProjectId()); this.configuration = configuration; } - public WritableByteChannel create(final StorageResourceId resourceId, final CreateOptions options) + private static Storage createStorage(String projectId) { + if (projectId != null) { + return StorageOptions.newBuilder().setProjectId(projectId).build().getService(); + } + + return StorageOptions.newBuilder().build().getService(); + } + + WritableByteChannel create(final StorageResourceId resourceId, final CreateOptions options) throws IOException { LOG.trace("create({})", resourceId); @@ -104,7 +129,7 @@ private long getWriteGeneration(StorageResourceId resourceId, boolean overwrite) throw new FileAlreadyExistsException(String.format("Object %s already exists.", resourceId)); } - public void close() { + void close() { try { storage.close(); } catch (Exception e) { @@ -112,7 +137,7 @@ public void close() { } } - public GoogleCloudStorageItemInfo getItemInfo(StorageResourceId resourceId) throws IOException { + GoogleCloudStorageItemInfo getItemInfo(StorageResourceId resourceId) throws IOException { LOG.trace("getItemInfo({})", resourceId); // Handle ROOT case first. @@ -258,4 +283,350 @@ private static GoogleCloudStorageItemInfo createItemInfoForBucket(StorageResourc bucket.getLocation(), bucket.getStorageClass() == null ? null : bucket.getStorageClass().name()); } + + List<GoogleCloudStorageItemInfo> listObjectInfo( + String bucketName, + String objectNamePrefix, + ListObjectOptions listOptions) throws IOException { + try { + long maxResults = listOptions.getMaxResults() > 0 ? + listOptions.getMaxResults() + (listOptions.isIncludePrefix() ? 0 : 1) : + listOptions.getMaxResults(); + + Storage.BlobListOption[] blobListOptions = + getBlobListOptions(objectNamePrefix, listOptions, maxResults); + Page<Blob> blobs = storage.list(bucketName, blobListOptions); + ListOperationResult result = new ListOperationResult(maxResults); + for (Blob blob : blobs.iterateAll()) { + result.add(blob); + } + + return result.getItems(); + } catch (StorageException e) { + throw new IOException( + String.format("listing object '%s' failed.", BlobId.of(bucketName, objectNamePrefix)), + e); + } + } + + private Storage.BlobListOption[] getBlobListOptions( + String objectNamePrefix, ListObjectOptions listOptions, long maxResults) { + List<Storage.BlobListOption> options = new ArrayList<>(); + + options.add(Storage.BlobListOption.fields(BLOB_FIELDS.toArray(new Storage.BlobField[0]))); + options.add(Storage.BlobListOption.prefix(objectNamePrefix)); + // TODO: set max results as a BlobListOption + if ("/".equals(listOptions.getDelimiter())) { + options.add(Storage.BlobListOption.currentDirectory()); + } + + if (listOptions.getDelimiter() != null) { + options.add(Storage.BlobListOption.includeTrailingDelimiter()); + } + + return options.toArray(new Storage.BlobListOption[0]); + } + + private GoogleCloudStorageItemInfo createItemInfoForBlob(Blob blob) { + long generationId = blob.getGeneration() == null ? 0L : blob.getGeneration(); + StorageResourceId resourceId = + new StorageResourceId(blob.getBucket(), blob.getName(), generationId); + return createItemInfoForBlob(resourceId, blob); + } + + void createBucket(String bucketName, CreateBucketOptions options) throws IOException { + LOG.trace("createBucket({})", bucketName); + checkArgument(!isNullOrEmpty(bucketName), "bucketName must not be null or empty"); + checkNotNull(options, "options must not be null"); + + BucketInfo.Builder bucketInfoBuilder = + BucketInfo.newBuilder(bucketName).setLocation(options.getLocation()); + + if (options.getStorageClass() != null) { + bucketInfoBuilder.setStorageClass( + StorageClass.valueOfStrict(options.getStorageClass().toUpperCase())); + } + if (options.getTtl() != null) { + bucketInfoBuilder.setLifecycleRules( + Collections.singletonList( + new BucketInfo.LifecycleRule( + BucketInfo.LifecycleRule.LifecycleAction.newDeleteAction(), + BucketInfo.LifecycleRule.LifecycleCondition.newBuilder() + .setAge(toIntExact(options.getTtl().toDays())) + .build()))); + } + try { + storage.create(bucketInfoBuilder.build()); + } catch (StorageException e) { + if (ErrorTypeExtractor.bucketAlreadyExists(e)) { + throw (FileAlreadyExistsException) + new FileAlreadyExistsException(String.format("Bucket '%s' already exists.", bucketName)) + .initCause(e); + } + throw new IOException(e); + } + } + + void createEmptyObject(StorageResourceId resourceId) throws IOException { + LOG.trace("createEmptyObject({})", resourceId); + checkArgument( + resourceId.isStorageObject(), "Expected full StorageObject id, got %s", resourceId); + createEmptyObject(resourceId, EMPTY_OBJECT_CREATE_OPTIONS); + } + + void createEmptyObject(StorageResourceId resourceId, CreateObjectOptions options) + throws IOException { + checkArgument( + resourceId.isStorageObject(), "Expected full StorageObject id, got %s", resourceId); + + try { + createEmptyObjectInternal(resourceId, options); + } catch (StorageException e) { + if (canIgnoreExceptionForEmptyObject(e, resourceId, options)) { + LOG.info( + "Ignoring exception of type {}; verified object already exists with desired state.", + e.getClass().getSimpleName()); + LOG.trace("Ignored exception while creating empty object: {}", resourceId, e); + } else { + if (ErrorTypeExtractor.getErrorType(e) == ErrorTypeExtractor.ErrorType.ALREADY_EXISTS) { + throw (FileAlreadyExistsException) + new FileAlreadyExistsException( + String.format("Object '%s' already exists.", resourceId) + ).initCause(e); + } + throw new IOException(e); + } + } + } + + /** + * Helper to check whether an empty object already exists with the expected metadata specified in + * {@code options}, to be used to determine whether it's safe to ignore an exception that was + * thrown when trying to create the object, {@code exceptionOnCreate}. + */ + private boolean canIgnoreExceptionForEmptyObject( + StorageException exceptionOnCreate, StorageResourceId resourceId, CreateObjectOptions options) + throws IOException { + ErrorTypeExtractor.ErrorType errorType = ErrorTypeExtractor.getErrorType(exceptionOnCreate); + if (shouldBackoff(resourceId, errorType)) { + GoogleCloudStorageItemInfo existingInfo; + Duration maxWaitTime = Duration.ofSeconds(3); // TODO: make this configurable + + BackOff backOff = + !maxWaitTime.isZero() && !maxWaitTime.isNegative() + ? new ExponentialBackOff.Builder() + .setMaxElapsedTimeMillis(toIntExact(maxWaitTime.toMillis())) + .setMaxIntervalMillis(500) + .setInitialIntervalMillis(100) + .setMultiplier(1.5) + .setRandomizationFactor(0.15) + .build() + : BackOff.STOP_BACKOFF; + long nextSleep = 0L; + do { + if (nextSleep > 0) { + try { + Sleeper.DEFAULT.sleep(nextSleep); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + nextSleep = BackOff.STOP; + } + } + existingInfo = getItemInfo(resourceId); + nextSleep = nextSleep == BackOff.STOP ? BackOff.STOP : backOff.nextBackOffMillis(); + } while (!existingInfo.exists() && nextSleep != BackOff.STOP); + + // Compare existence, size, and metadata; for 429 errors creating an empty object, + // we don't care about metaGeneration/contentGeneration as long as the metadata + // matches, since we don't know for sure whether our low-level request succeeded + // first or some other client succeeded first. + if (existingInfo.exists() && existingInfo.getSize() == 0) { + if (options.isEnsureEmptyObjectsMetadataMatch()) { + return existingInfo.metadataEquals(options.getMetadata()); + } + return true; + } + } + return false; + } + + private static boolean shouldBackoff(StorageResourceId resourceId, + ErrorTypeExtractor.ErrorType errorType) { + return errorType == ErrorTypeExtractor.ErrorType.RESOURCE_EXHAUSTED + || errorType == ErrorTypeExtractor.ErrorType.INTERNAL || + (resourceId.isDirectory() && errorType == ErrorTypeExtractor.ErrorType.FAILED_PRECONDITION); + } + + private void createEmptyObjectInternal( + StorageResourceId resourceId, CreateObjectOptions createObjectOptions) throws IOException { + Map<String, String> rewrittenMetadata = encodeMetadata(createObjectOptions.getMetadata()); + + List<Storage.BlobTargetOption> blobTargetOptions = new ArrayList<>(); + blobTargetOptions.add(Storage.BlobTargetOption.disableGzipContent()); + if (resourceId.hasGenerationId()) { + blobTargetOptions.add(Storage.BlobTargetOption.generationMatch(resourceId.getGenerationId())); + } else if (resourceId.isDirectory() || !createObjectOptions.isOverwriteExisting()) { + blobTargetOptions.add(Storage.BlobTargetOption.doesNotExist()); + } + + try { + // TODO: Set encryption key and related properties + storage.create( + BlobInfo.newBuilder(BlobId.of(resourceId.getBucketName(), resourceId.getObjectName())) + .setMetadata(rewrittenMetadata) + .setContentEncoding(createObjectOptions.getContentEncoding()) + .setContentType(createObjectOptions.getContentType()) + .build(), + blobTargetOptions.toArray(new Storage.BlobTargetOption[0])); + } catch (StorageException e) { + throw new IOException(String.format("Creating empty object %s failed.", resourceId), e); + } + } + + private static Map<String, String> encodeMetadata(Map<String, byte[]> metadata) { + return Maps.transformValues(metadata, GoogleCloudStorage::encodeMetadataValues); + } + + private static String encodeMetadataValues(byte[] bytes) { + return bytes == null ? null : BaseEncoding.base64().encode(bytes); + } + + List<GoogleCloudStorageItemInfo> listDirectoryRecursive(String bucketName, String objectName) + throws IOException { + // TODO: Take delimiter from config + // TODO: Set specific fields + + try { + Page<Blob> blobs = storage.list( + bucketName, + Storage.BlobListOption.prefix(objectName)); + + List<GoogleCloudStorageItemInfo> result = new ArrayList<>(); + for (Blob blob : blobs.iterateAll()) { + result.add(createItemInfoForBlob(blob)); + } + + return result; + } catch (StorageException e) { + throw new IOException( + String.format("Listing '%s' failed", BlobId.of(bucketName, objectName)), e); + } + } + + void deleteObjects(List<StorageResourceId> fullObjectNames) throws IOException { + LOG.trace("deleteObjects({})", fullObjectNames); + + if (fullObjectNames.isEmpty()) { + return; + } + + // Validate that all the elements represent StorageObjects. + for (StorageResourceId toDelete : fullObjectNames) { + checkArgument( + toDelete.isStorageObject(), + "Expected full StorageObject names only, got: %s", + toDelete); + } + + // TODO: Do this concurrently + // TODO: There is duplication. fix it + for (StorageResourceId toDelete : fullObjectNames) { + try { + LOG.trace("Deleting Object ({})", toDelete); + if (toDelete.hasGenerationId() && toDelete.getGenerationId() != 0) { + storage.delete( + BlobId.of(toDelete.getBucketName(), toDelete.getObjectName()), + Storage.BlobSourceOption.generationMatch(toDelete.getGenerationId())); + } else { + // TODO: Remove delete without generationId + storage.delete(BlobId.of(toDelete.getBucketName(), toDelete.getObjectName())); + + LOG.trace("Deleting Object without generationId ({})", toDelete); + } + } catch (StorageException e) { + throw new IOException(String.format("Deleting resource %s failed.", toDelete), e); + } + } + } + + List<GoogleCloudStorageItemInfo> listBucketInfo() throws IOException { + List<Bucket> allBuckets = listBucketsInternal(); + List<GoogleCloudStorageItemInfo> bucketInfos = new ArrayList<>(allBuckets.size()); + for (Bucket bucket : allBuckets) { + bucketInfos.add(createItemInfoForBucket(new StorageResourceId(bucket.getName()), bucket)); + } + return bucketInfos; + } + + + private List<Bucket> listBucketsInternal() throws IOException { + checkNotNull(configuration.getProjectId(), "projectId must not be null"); + List<Bucket> allBuckets = new ArrayList<>(); + try { + Page<Bucket> buckets = + storage.list( + Storage.BucketListOption.pageSize(configuration.getMaxListItemsPerCall()), + Storage.BucketListOption.fields( + Storage.BucketField.LOCATION, + Storage.BucketField.STORAGE_CLASS, + Storage.BucketField.TIME_CREATED, + Storage.BucketField.UPDATED)); + + // Loop to fetch all the items. + for (Bucket bucket : buckets.iterateAll()) { + allBuckets.add(bucket); + } + } catch (StorageException e) { + throw new IOException(e); + } + return allBuckets; + } + + // Helper class to capture the results of list operation. + private class ListOperationResult { + private final Map<String, Blob> prefixes = new HashMap<>(); + private final List<Blob> objects = new ArrayList<>(); + + private final Set<String> objectsSet = new HashSet<>(); + + private final long maxResults; + + ListOperationResult(long maxResults) { + this.maxResults = maxResults; + } + + void add(Blob blob) { + String path = blob.getBlobId().toGsUtilUri(); + if (blob.getGeneration() != null) { + prefixes.remove(path); + objects.add(blob); + + objectsSet.add(path); + } else if (!objectsSet.contains(path)) { + prefixes.put(path, blob); + } + } + + List<GoogleCloudStorageItemInfo> getItems() { + List<GoogleCloudStorageItemInfo> result = new ArrayList<>(prefixes.size() + objects.size()); + + for (Blob blob : objects) { + result.add(createItemInfoForBlob(blob)); + + if (result.size() == maxResults) { + return result; + } + } + + for (Blob blob : prefixes.values()) { + if (result.size() == maxResults) { + return result; + } + + result.add(createItemInfoForBlob(blob)); + } + + return result; + } + } } diff --git a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageFileSystem.java b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageFileSystem.java index e411f22eb39..aa1617e4da6 100644 --- a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageFileSystem.java +++ b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageFileSystem.java @@ -19,21 +19,57 @@ package org.apache.hadoop.fs.gs; import static org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.*; +import static java.util.Comparator.comparing; +import static org.apache.hadoop.fs.gs.Constants.PATH_DELIMITER; import static org.apache.hadoop.fs.gs.Constants.SCHEME; import com.google.auth.Credentials; +import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList; +import org.apache.hadoop.thirdparty.com.google.common.collect.Iterables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; import java.nio.channels.WritableByteChannel; +import java.nio.file.DirectoryNotEmptyException; +import java.nio.file.FileAlreadyExistsException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Objects; /** * Provides FS semantics over GCS based on Objects API. */ class GoogleCloudStorageFileSystem { private static final Logger LOG = LoggerFactory.getLogger(StorageResourceId.class); + // Comparator used for sorting paths. + // + // For some bulk operations, we need to operate on parent directories before + // we operate on their children. To achieve this, we sort paths such that + // shorter paths appear before longer paths. Also, we sort lexicographically + // within paths of the same length (this is not strictly required but helps when + // debugging/testing). + @VisibleForTesting + static final Comparator<URI> PATH_COMPARATOR = + comparing( + URI::toString, + (as, bs) -> + (as.length() == bs.length()) + ? as.compareTo(bs) + : Integer.compare(as.length(), bs.length())); + + static final Comparator<FileInfo> FILE_INFO_PATH_COMPARATOR = + comparing(FileInfo::getPath, PATH_COMPARATOR); + + private static final ListObjectOptions GET_FILE_INFO_LIST_OPTIONS = + ListObjectOptions.DEFAULT.builder().setIncludePrefix(true).setMaxResults(1).build(); + + private static final ListObjectOptions LIST_FILE_INFO_LIST_OPTIONS = + ListObjectOptions.DEFAULT.builder().setIncludePrefix(true).build(); // URI of the root path. static final URI GCSROOT = URI.create(SCHEME + ":/"); @@ -86,4 +122,253 @@ void close() { gcs = null; } } + + public FileInfo getFileInfo(URI path) throws IOException { + checkArgument(path != null, "path must not be null"); + // Validate the given path. true == allow empty object name. + // One should be able to get info about top level directory (== bucket), + // therefore we allow object name to be empty. + StorageResourceId resourceId = StorageResourceId.fromUriPath(path, true); + FileInfo fileInfo = + FileInfo.fromItemInfo( + getFileInfoInternal(resourceId, /* inferImplicitDirectories= */ true)); + LOG.trace("getFileInfo(path: {}): {}", path, fileInfo); + return fileInfo; + } + + private GoogleCloudStorageItemInfo getFileInfoInternal( + StorageResourceId resourceId, + boolean inferImplicitDirectories) + throws IOException { + if (resourceId.isRoot() || resourceId.isBucket()) { + return gcs.getItemInfo(resourceId); + } + + StorageResourceId dirId = resourceId.toDirectoryId(); + if (!resourceId.isDirectory()) { + GoogleCloudStorageItemInfo itemInfo = gcs.getItemInfo(resourceId); + if (itemInfo.exists()) { + return itemInfo; + } + + if (inferImplicitDirectories) { + // TODO: Set max result + List<GoogleCloudStorageItemInfo> listDirResult = gcs.listObjectInfo( + resourceId.getBucketName(), + resourceId.getObjectName(), + GET_FILE_INFO_LIST_OPTIONS); + LOG.trace("List for getMetadata returned {}. {}", listDirResult.size(), listDirResult); + if (!listDirResult.isEmpty()) { + LOG.trace("Get metadata for directory returned non empty {}", listDirResult); + return GoogleCloudStorageItemInfo.createInferredDirectory(resourceId.toDirectoryId()); + } + } + } + + List<GoogleCloudStorageItemInfo> listDirInfo = ImmutableList.of(gcs.getItemInfo(dirId)); + if (listDirInfo.isEmpty()) { + return GoogleCloudStorageItemInfo.createNotFound(resourceId); + } + checkState(listDirInfo.size() <= 2, "listed more than 2 objects: '%s'", listDirInfo); + GoogleCloudStorageItemInfo dirInfo = Iterables.get(listDirInfo, /* position= */ 0); + checkState( + dirInfo.getResourceId().equals(dirId) || !inferImplicitDirectories, + "listed wrong object '%s', but should be '%s'", + dirInfo.getResourceId(), + resourceId); + return dirInfo.getResourceId().equals(dirId) && dirInfo.exists() + ? dirInfo + : GoogleCloudStorageItemInfo.createNotFound(resourceId); + } + + public void mkdirs(URI path) throws IOException { + LOG.trace("mkdirs(path: {})", path); + checkNotNull(path, "path should not be null"); + + /* allowEmptyObjectName= */ + StorageResourceId resourceId = + StorageResourceId.fromUriPath(path, /* allowEmptyObjectName= */ true); + if (resourceId.isRoot()) { + // GCS_ROOT directory always exists, no need to go through the rest of the method. + return; + } + + // In case path is a bucket we just attempt to create it without additional checks + if (resourceId.isBucket()) { + try { + gcs.createBucket(resourceId.getBucketName(), CreateBucketOptions.DEFAULT); + } catch (FileAlreadyExistsException e) { + // This means that bucket already exist, and we do not need to do anything. + LOG.trace("mkdirs: {} already exists, ignoring creation failure", resourceId, e); + } + return; + } + + resourceId = resourceId.toDirectoryId(); + + // TODO: Before creating a leaf directory we need to check if there are no conflicting files + // TODO: with the same name as any subdirectory + + // Create only a leaf directory because subdirectories will be inferred + // if leaf directory exists + try { + gcs.createEmptyObject(resourceId); + } catch (FileAlreadyExistsException e) { + // This means that directory object already exist, and we do not need to do anything. + LOG.trace("mkdirs: {} already exists, ignoring creation failure", resourceId, e); + } + } + + void delete(URI path, boolean recursive) throws IOException { + checkNotNull(path, "path should not be null"); + checkArgument(!path.equals(GCSROOT), "Cannot delete root path (%s)", path); + + FileInfo fileInfo = getFileInfo(path); + if (!fileInfo.exists()) { + throw new FileNotFoundException("Item not found: " + path); + } + + List<FileInfo> itemsToDelete; + // Delete sub-items if it is a directory. + if (fileInfo.isDirectory()) { + itemsToDelete = + recursive + ? listRecursive(fileInfo.getPath()) // TODO: Get only one result + : listDirectory(fileInfo.getPath()); + + if (!itemsToDelete.isEmpty() && !recursive) { + throw new DirectoryNotEmptyException("Cannot delete a non-empty directory. : " + path); + } + } else { + itemsToDelete = new ArrayList<>(); + } + + List<FileInfo> bucketsToDelete = new ArrayList<>(); + (fileInfo.getItemInfo().isBucket() ? bucketsToDelete : itemsToDelete).add(fileInfo); + + deleteObjects(itemsToDelete, bucketsToDelete); + + StorageResourceId parentId = + StorageResourceId.fromUriPath(UriPaths.getParentPath(path), true); + GoogleCloudStorageItemInfo parentInfo = + getFileInfoInternal(parentId, /* inferImplicitDirectories= */ false); + + StorageResourceId resourceId = parentInfo.getResourceId(); + if (parentInfo.exists() + || resourceId.isRoot() + || resourceId.isBucket() + || PATH_DELIMITER.equals(resourceId.getObjectName())) { + return; + } + + // TODO: Keep the repair parent step behind a flag + gcs.createEmptyObject(parentId); + } + + private List<FileInfo> listRecursive(URI prefix) throws IOException { + StorageResourceId prefixId = getPrefixId(prefix); + List<GoogleCloudStorageItemInfo> itemInfos = + gcs.listDirectoryRecursive(prefixId.getBucketName(), prefixId.getObjectName()); + List<FileInfo> fileInfos = FileInfo.fromItemInfos(itemInfos); + fileInfos.sort(FILE_INFO_PATH_COMPARATOR); + return fileInfos; + } + + private List<FileInfo> listDirectory(URI prefix) throws IOException { + StorageResourceId prefixId = getPrefixId(prefix); + List<GoogleCloudStorageItemInfo> itemInfos = gcs.listObjectInfo( + prefixId.getBucketName(), + prefixId.getObjectName(), + ListObjectOptions.DEFAULT_FLAT_LIST); + + List<FileInfo> fileInfos = FileInfo.fromItemInfos(itemInfos); + fileInfos.sort(FILE_INFO_PATH_COMPARATOR); + return fileInfos; + } + + private StorageResourceId getPrefixId(URI prefix) { + checkNotNull(prefix, "prefix could not be null"); + + StorageResourceId prefixId = StorageResourceId.fromUriPath(prefix, true); + checkArgument(!prefixId.isRoot(), "prefix must not be global root, got '%s'", prefix); + + return prefixId; + } + + private void deleteObjects( + List<FileInfo> itemsToDelete, List<FileInfo> bucketsToDelete) + throws IOException { + LOG.trace("deleteInternalWithFolders; fileSize={} bucketSize={}", + itemsToDelete.size(), bucketsToDelete.size()); + deleteObjects(itemsToDelete); + deleteBucket(bucketsToDelete); + } + + private void deleteObjects(List<FileInfo> itemsToDelete) throws IOException { + // Delete children before their parents. + // + // Note: we modify the input list, which is ok for current usage. + // We should make a copy in case that changes in future. + itemsToDelete.sort(FILE_INFO_PATH_COMPARATOR.reversed()); + + if (!itemsToDelete.isEmpty()) { + List<StorageResourceId> objectsToDelete = new ArrayList<>(itemsToDelete.size()); + for (FileInfo fileInfo : itemsToDelete) { + if (!fileInfo.isInferredDirectory()) { + objectsToDelete.add( + new StorageResourceId( + fileInfo.getItemInfo().getBucketName(), + fileInfo.getItemInfo().getObjectName(), + fileInfo.getItemInfo().getContentGeneration())); + } + } + + gcs.deleteObjects(objectsToDelete); + } + } + + private void deleteBucket(List<FileInfo> bucketsToDelete) throws IOException { + if (bucketsToDelete == null || bucketsToDelete.isEmpty()) { + return; + } + + // TODO: Add support for deleting bucket + throw new UnsupportedOperationException("deleteBucket is not supported."); + } + + public List<FileInfo> listFileInfo(URI path, ListFileOptions listOptions) throws IOException { + checkNotNull(path, "path can not be null"); + LOG.trace("listStatus(path: {})", path); + + StorageResourceId pathId = + StorageResourceId.fromUriPath(path, /* allowEmptyObjectName= */ true); + + if (!pathId.isDirectory()) { + GoogleCloudStorageItemInfo pathInfo = gcs.getItemInfo(pathId); + if (pathInfo.exists()) { + List<FileInfo> listedInfo = new ArrayList<>(); + listedInfo.add(FileInfo.fromItemInfo(pathInfo)); + + return listedInfo; + } + } + + StorageResourceId dirId = pathId.toDirectoryId(); + List<GoogleCloudStorageItemInfo> dirItemInfos = dirId.isRoot() ? + gcs.listBucketInfo() : + gcs.listObjectInfo( + dirId.getBucketName(), dirId.getObjectName(), LIST_FILE_INFO_LIST_OPTIONS); + + if (pathId.isStorageObject() && dirItemInfos.isEmpty()) { + throw new FileNotFoundException("Item not found: " + path); + } + + if (!dirItemInfos.isEmpty() && Objects.equals(dirItemInfos.get(0).getResourceId(), dirId)) { + dirItemInfos.remove(0); + } + + List<FileInfo> fileInfos = FileInfo.fromItemInfos(dirItemInfos); + fileInfos.sort(FILE_INFO_PATH_COMPARATOR); + return fileInfos; + } } diff --git a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageItemInfo.java b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageItemInfo.java index 887e68b05f9..83169b8d921 100644 --- a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageItemInfo.java +++ b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageItemInfo.java @@ -1,11 +1,13 @@ /* - * Copyright 2013 Google Inc. + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, diff --git a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystem.java b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystem.java index 1c2fc19d2b5..8831568a356 100644 --- a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystem.java +++ b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystem.java @@ -32,12 +32,16 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; +import java.nio.file.DirectoryNotEmptyException; +import java.util.ArrayList; import java.util.EnumSet; +import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.security.ProviderUtils; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Progressable; import org.slf4j.Logger; @@ -273,7 +277,6 @@ public FSDataOutputStream create(Path hadoopPath, FsPermission permission, boole checkArgument(replication > 0, "replication must be a positive integer: %s", replication); checkArgument(blockSize > 0, "blockSize must be a positive integer: %s", blockSize); - System.out.println(String.format("create(%s)", hadoopPath)); checkOpen(); LOG.trace("create(hadoopPath: {}, overwrite: {}, bufferSize: {} [ignored])", hadoopPath, @@ -289,10 +292,32 @@ public FSDataOutputStream create(Path hadoopPath, FsPermission permission, boole } @Override - public FSDataOutputStream createNonRecursive(Path hadoopPath, FsPermission permission, - EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize, - Progressable progress) throws IOException { - throw new UnsupportedOperationException(hadoopPath.toString()); + public FSDataOutputStream createNonRecursive( + Path hadoopPath, + FsPermission permission, + EnumSet<CreateFlag> flags, + int bufferSize, + short replication, + long blockSize, + Progressable progress) + throws IOException { + URI gcsPath = getGcsPath(checkNotNull(hadoopPath, "hadoopPath must not be null")); + URI parentGcsPath = UriPaths.getParentPath(gcsPath); + if (!getGcsFs().getFileInfo(parentGcsPath).exists()) { + throw new FileNotFoundException( + String.format( + "Can not create '%s' file, because parent folder does not exist: %s", + gcsPath, parentGcsPath)); + } + + return create( + hadoopPath, + permission, + flags.contains(CreateFlag.OVERWRITE), + bufferSize, + replication, + blockSize, + progress); } @Override @@ -308,19 +333,57 @@ public boolean rename(final Path path, final Path path1) throws IOException { } @Override - public boolean delete(final Path path, final boolean recursive) throws IOException { - LOG.trace("delete({}, {})", path, recursive); - throw new UnsupportedOperationException(path.toString()); + public boolean delete(final Path hadoopPath, final boolean recursive) throws IOException { + LOG.trace("delete({}, {})", hadoopPath, recursive); + checkArgument(hadoopPath != null, "hadoopPath must not be null"); + + checkOpen(); + + URI gcsPath = getGcsPath(hadoopPath); + try { + getGcsFs().delete(gcsPath, recursive); + } catch (DirectoryNotEmptyException e) { + throw e; + } catch (IOException e) { + if (ApiErrorExtractor.INSTANCE.requestFailure(e)) { + throw e; + } + LOG.trace("delete(hadoopPath: {}, recursive: {}): false [failed]", hadoopPath, recursive, e); + return false; + } + + LOG.trace("delete(hadoopPath: %s, recursive: %b): true", hadoopPath, recursive); + return true; } @Override - public FileStatus[] listStatus(final Path path) throws FileNotFoundException, IOException { - checkArgument(path != null, "hadoopPath must not be null"); + public FileStatus[] listStatus(final Path hadoopPath) throws IOException { + checkArgument(hadoopPath != null, "hadoopPath must not be null"); checkOpen(); - LOG.trace("listStatus(hadoopPath: {})", path); - throw new UnsupportedOperationException(path.toString()); + LOG.trace("listStatus(hadoopPath: {})", hadoopPath); + + URI gcsPath = getGcsPath(hadoopPath); + List<FileStatus> status; + + try { + List<FileInfo> fileInfos = getGcsFs().listFileInfo(gcsPath, ListFileOptions.OBJECTFIELDS); + status = new ArrayList<>(fileInfos.size()); + String userName = getUgiUserName(); + for (FileInfo fileInfo : fileInfos) { + status.add(getFileStatus(fileInfo, userName)); + } + } catch (FileNotFoundException fnfe) { + throw (FileNotFoundException) + new FileNotFoundException( + String.format( + "listStatus(hadoopPath: %s): '%s' does not exist.", + hadoopPath, gcsPath)) + .initCause(fnfe); + } + + return status.toArray(new FileStatus[0]); } /** @@ -402,18 +465,29 @@ public Path getWorkingDirectory() { } @Override - public boolean mkdirs(final Path path, final FsPermission fsPermission) throws IOException { - LOG.trace("mkdirs({})", path); - throw new UnsupportedOperationException(path.toString()); - } + public boolean mkdirs(final Path hadoopPath, final FsPermission permission) throws IOException { + checkArgument(hadoopPath != null, "hadoopPath must not be null"); -// /** -// * Gets the default replication factor. -// */ -// @Override -// public short getDefaultReplication() { -// return REPLICATION_FACTOR_DEFAULT; -// } + LOG.trace( + "mkdirs(hadoopPath: {}, permission: {}): true", hadoopPath, permission); + + checkOpen(); + + URI gcsPath = getGcsPath(hadoopPath); + try { + getGcsFs().mkdirs(gcsPath); + } catch (java.nio.file.FileAlreadyExistsException faee) { + // Need to convert to the Hadoop flavor of FileAlreadyExistsException. + throw (FileAlreadyExistsException) + new FileAlreadyExistsException( + String.format( + "mkdirs(hadoopPath: %s, permission: %s): failed", + hadoopPath, permission)) + .initCause(faee); + } + + return true; + } @Override public FileStatus getFileStatus(final Path path) throws IOException { @@ -423,9 +497,14 @@ public FileStatus getFileStatus(final Path path) throws IOException { URI gcsPath = getGcsPath(path); - LOG.trace("getFileStatus(): {}", gcsPath); - - throw new UnsupportedOperationException(path.toString()); + FileInfo fileInfo = getGcsFs().getFileInfo(gcsPath); + if (!fileInfo.exists()) { + throw new FileNotFoundException( + String.format( + "%s not found: %s", fileInfo.isDirectory() ? "Directory" : "File", path)); + } + String userName = getUgiUserName(); + return getFileStatus(fileInfo, userName); } /** @@ -502,4 +581,29 @@ public void setWorkingDirectory(final Path hadoopPath) { workingDirectory = getHadoopPath(gcsPath); LOG.trace("setWorkingDirectory(hadoopPath: {}): {}", hadoopPath, workingDirectory); } + + + private static String getUgiUserName() throws IOException { + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + return ugi.getShortUserName(); + } + + private FileStatus getFileStatus(FileInfo fileInfo, String userName) { + checkNotNull(fileInfo, "fileInfo should not be null"); + // GCS does not provide modification time. It only provides creation time. + // It works for objects because they are immutable once created. + FileStatus status = new FileStatus( + fileInfo.getSize(), + fileInfo.isDirectory(), + REPLICATION_FACTOR_DEFAULT, + defaultBlockSize, + fileInfo.getModificationTime(), + fileInfo.getModificationTime(), + reportedPermissions, + userName, + userName, + getHadoopPath(fileInfo.getPath())); + LOG.trace("FileStatus(path: {}, userName: {}): {}", fileInfo.getPath(), userName, status); + return status; + } } diff --git a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystemConfiguration.java b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystemConfiguration.java index 16d940b16f4..a480a72e60b 100644 --- a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystemConfiguration.java +++ b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystemConfiguration.java @@ -34,28 +34,29 @@ class GoogleHadoopFileSystemConfiguration { * querying the value. Modifying this value allows one to control how many mappers are used to * process a given file. */ - public static final HadoopConfigurationProperty<Long> BLOCK_SIZE = + static final HadoopConfigurationProperty<Long> BLOCK_SIZE = new HadoopConfigurationProperty<>("fs.gs.block.size", 64 * 1024 * 1024L); /** * Configuration key for GCS project ID. Default value: none */ - public static final HadoopConfigurationProperty<String> GCS_PROJECT_ID = + static final HadoopConfigurationProperty<String> GCS_PROJECT_ID = new HadoopConfigurationProperty<>("fs.gs.project.id"); /** * Configuration key for initial working directory of a GHFS instance. Default value: '/' */ - public static final HadoopConfigurationProperty<String> GCS_WORKING_DIRECTORY = + static final HadoopConfigurationProperty<String> GCS_WORKING_DIRECTORY = new HadoopConfigurationProperty<>("fs.gs.working.dir", "/"); /** * Configuration key for setting write buffer size. */ - public static final HadoopConfigurationProperty<Long> GCS_OUTPUT_STREAM_BUFFER_SIZE = + static final HadoopConfigurationProperty<Long> GCS_OUTPUT_STREAM_BUFFER_SIZE = new HadoopConfigurationProperty<>("fs.gs.outputstream.buffer.size", 8L * 1024 * 1024); private final String workingDirectory; + private final String projectId; public int getOutStreamBufferSize() { return outStreamBufferSize; @@ -67,9 +68,18 @@ public int getOutStreamBufferSize() { this.workingDirectory = GCS_WORKING_DIRECTORY.get(config, config::get); this.outStreamBufferSize = toIntExact(GCS_OUTPUT_STREAM_BUFFER_SIZE.get(config, config::getLongBytes)); + this.projectId = GCS_PROJECT_ID.get(config, config::get); } public String getWorkingDirectory() { return this.workingDirectory; } + + String getProjectId() { + return this.projectId; + } + + public long getMaxListItemsPerCall() { + return 5000L; //TODO: Make this configurable + } } diff --git a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/IoExceptionHelper.java b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/IoExceptionHelper.java new file mode 100644 index 00000000000..c68a6cac1a1 --- /dev/null +++ b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/IoExceptionHelper.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.gs; + +import java.io.IOError; +import java.io.IOException; +import java.net.SocketException; +import java.net.SocketTimeoutException; +import javax.net.ssl.SSLException; + +/** + * Translates exceptions from API calls into higher-level meaning, while allowing injectability for + * testing how API errors are handled. + */ +public final class IoExceptionHelper { + + private IoExceptionHelper() {} + + /** + * Determines if a given {@link Throwable} is caused by an IO error. + * + * <p>Recursively checks {@code getCause()} if outer exception isn't an instance of the correct + * class. + * + * @param throwable The {@link Throwable} to check. + * @return True if the {@link Throwable} is a result of an IO error. + */ + public static boolean isIoError(Throwable throwable) { + if (throwable instanceof IOException || throwable instanceof IOError) { + return true; + } + Throwable cause = throwable.getCause(); + return cause != null && isIoError(cause); + } + + /** + * Determines if a given {@link Throwable} is caused by a socket error. + * + * <p>Recursively checks {@code getCause()} if outer exception isn't an instance of the correct + * class. + * + * @param throwable The {@link Throwable} to check. + * @return True if the {@link Throwable} is a result of a socket error. + */ + public static boolean isSocketError(Throwable throwable) { + if (throwable instanceof SocketException || throwable instanceof SocketTimeoutException) { + return true; + } + Throwable cause = throwable.getCause(); + // Subset of SSL exceptions that are caused by IO errors (e.g. SSLHandshakeException due to + // unexpected connection closure) is also a socket error. + if (throwable instanceof SSLException && cause != null && isIoError(cause)) { + return true; + } + return cause != null && isSocketError(cause); + } + + /** + * Determines if a given {@link IOException} is caused by a timed out read. + * + * @param e The {@link IOException} to check. + * @return True if the {@link IOException} is a result of a read timeout. + */ + public static boolean isReadTimedOut(IOException e) { + return e instanceof SocketTimeoutException && e.getMessage().equalsIgnoreCase("Read timed out"); + } +} diff --git a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ListFileOptions.java b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ListFileOptions.java new file mode 100644 index 00000000000..2bc74c6fc21 --- /dev/null +++ b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ListFileOptions.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.gs; + +import javax.annotation.Nonnull; + +final class ListFileOptions { + static final ListFileOptions OBJECTFIELDS = new ListFileOptions("bucket,name,size,updated"); + private final String fields; + + private ListFileOptions(@Nonnull String fields) { + this.fields = fields; + } + + String getFields() { + return fields; + } +} diff --git a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ListObjectOptions.java b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ListObjectOptions.java new file mode 100644 index 00000000000..60ec409b5c7 --- /dev/null +++ b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ListObjectOptions.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.gs; + +import javax.annotation.Nullable; + +import static org.apache.hadoop.fs.gs.Constants.PATH_DELIMITER; + +/** Options that can be specified when listing objects in the {@link GoogleCloudStorage}. */ +final class ListObjectOptions { + + /** List all objects in the directory. */ + public static final ListObjectOptions DEFAULT = new Builder().build(); + + /** List all objects with the prefix. */ + public static final ListObjectOptions DEFAULT_FLAT_LIST = + DEFAULT.builder().setDelimiter(null).build(); + + Builder builder() { + Builder result = new Builder(); + result.fields = fields; + result.delimiter = delimiter; + result.maxResults = maxResult; + result.includePrefix = includePrefix; + + return result; + } + + private final String delimiter; + private final boolean includePrefix; + private final long maxResult; + private final String fields; + + private ListObjectOptions(Builder builder) { + this.delimiter = builder.delimiter; + this.includePrefix = builder.includePrefix; + this.maxResult = builder.maxResults; + this.fields = builder.fields; + } + + /** Delimiter to use (typically {@code /}), otherwise {@code null}. */ + @Nullable + String getDelimiter() { + return delimiter; + } + + /** Whether to include prefix object in the result. */ + boolean isIncludePrefix() { + return includePrefix; + } + + /** Maximum number of results to return, unlimited if negative or zero. */ + long getMaxResults() { + return maxResult; + } + + /** + * Comma separated list of object fields to include in the list response. + * + * <p>See <a + * href="https://cloud.google.com/storage/docs/json_api/v1/objects#resource-representations"> + * object resource</a> for reference. + */ + @Nullable + String getFields() { + return fields; + } + + static class Builder { + private static final int MAX_RESULTS_UNLIMITED = -1; + + static final String OBJECT_FIELDS = + String.join( + /* delimiter= */ ",", + "bucket", + "name", + "timeCreated", + "updated", + "generation", + "metageneration", + "size", + "contentType", + "contentEncoding", + "md5Hash", + "crc32c", + "metadata"); + + private String delimiter; + private boolean includePrefix; + + private long maxResults; + + private String fields; + + Builder() { + this.delimiter = PATH_DELIMITER; + this.includePrefix = false; + this.maxResults = MAX_RESULTS_UNLIMITED; + this.fields = OBJECT_FIELDS; + } + public Builder setDelimiter(String d) { + this.delimiter = d; + return this; + } + + public Builder setIncludePrefix(boolean value) { + this.includePrefix = value; + return this; + } + + public Builder setMaxResults(long mr) { + this.maxResults = mr; + return this; + } + + public Builder setFields(String f) { + this.fields = f; + return this; + } + + public ListObjectOptions build() { + return new ListObjectOptions(this); + } + } +} diff --git a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/VerificationAttributes.java b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/VerificationAttributes.java index 4155482fc7d..03de0a52e37 100644 --- a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/VerificationAttributes.java +++ b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/VerificationAttributes.java @@ -1,11 +1,13 @@ /* - * Copyright 2016 Google Inc. + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, diff --git a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/TestConfiguration.java b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/TestConfiguration.java new file mode 100644 index 00000000000..f205276d372 --- /dev/null +++ b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/TestConfiguration.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.gs; + +/** Access to test configurations values. */ +public abstract class TestConfiguration { + public static final String GCS_TEST_PROJECT_ID = "GCS_TEST_PROJECT_ID"; + public static final String GCS_TEST_JSON_KEYFILE = "GCS_TEST_JSON_KEYFILE"; + + public static final String GCS_TEST_DIRECT_PATH_PREFERRED = "GCS_TEST_DIRECT_PATH_PREFERRED"; + + /** Environment-based test configuration. */ + public static class EnvironmentBasedTestConfiguration extends TestConfiguration { + @Override + public String getProjectId() { + return System.getenv(GCS_TEST_PROJECT_ID); + } + + @Override + public String getServiceAccountJsonKeyFile() { + return System.getenv(GCS_TEST_JSON_KEYFILE); + } + + @Override + public boolean isDirectPathPreferred() { + String envVar = System.getenv(GCS_TEST_DIRECT_PATH_PREFERRED); + // if env variable is not configured default behaviour is to attempt directPath + if (envVar == null) { + return true; + } + return Boolean.parseBoolean(envVar); + } + } + + public static TestConfiguration getInstance() { + return LazyHolder.INSTANCE; + } + + private static class LazyHolder { + private static final TestConfiguration INSTANCE = new EnvironmentBasedTestConfiguration(); + } + + public abstract String getProjectId(); + + public abstract String getServiceAccountJsonKeyFile(); + + public abstract boolean isDirectPathPreferred(); +} diff --git a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/TestStorageResourceId.java b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/TestStorageResourceId.java index e0a39b2d7e4..e027c7b4091 100644 --- a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/TestStorageResourceId.java +++ b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/TestStorageResourceId.java @@ -1,11 +1,13 @@ /* - * Copyright 2013 Google Inc. + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, diff --git a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/TestStringPaths.java b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/TestStringPaths.java index 16234e0ce1d..a6b64ff7cff 100644 --- a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/TestStringPaths.java +++ b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/TestStringPaths.java @@ -1,11 +1,13 @@ /* - * Copyright 2013 Google Inc. + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, diff --git a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/TestUriPaths.java b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/TestUriPaths.java index fe93a28dc43..0325df52f9b 100644 --- a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/TestUriPaths.java +++ b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/TestUriPaths.java @@ -1,11 +1,13 @@ /* - * Copyright 2013 Google Inc. + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, diff --git a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/GoogleContract.java b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/GoogleContract.java new file mode 100644 index 00000000000..aa131981caf --- /dev/null +++ b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/GoogleContract.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.gs.contract; + +import org.apache.hadoop.fs.gs.TestConfiguration; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractBondedFSContract; + +/** Contract of GoogleHadoopFileSystem via scheme "gs". */ +public class GoogleContract extends AbstractBondedFSContract { + private static final String CONTRACT_XML = "contract/gs.xml"; + + public GoogleContract(Configuration conf) { + super(conf); + addConfResource(CONTRACT_XML); + conf.set("fs.contract.test.fs.gs", "gs://arunchacko-oss-test-bucket"); // TODO: + + TestConfiguration testConf = TestConfiguration.getInstance(); + if (testConf.getProjectId() != null) { + conf.set("fs.gs.project.id", testConf.getProjectId()); + } + } + + @Override + public String getScheme() { + return "gs"; + } +} diff --git a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractDelete.java b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractDelete.java new file mode 100644 index 00000000000..7ed3834025c --- /dev/null +++ b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractDelete.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.gs.contract; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractDeleteTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.apache.hadoop.fs.contract.ContractTestUtils; + +public class ITestGoogleContractDelete extends AbstractContractDeleteTest { + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new GoogleContract(conf); + } + + @Override + public void testDeleteEmptyDirNonRecursive() { + // TODO: Enable this + ContractTestUtils.skip("Skipping the test. This will be enabled in a subsequent change"); + } +} diff --git a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractGetFileStatus.java b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractGetFileStatus.java new file mode 100644 index 00000000000..aae16c2a410 --- /dev/null +++ b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractGetFileStatus.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.gs.contract; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractGetFileStatusTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; + +public class ITestGoogleContractGetFileStatus extends AbstractContractGetFileStatusTest { + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new GoogleContract(conf); + } +} diff --git a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractMkdir.java b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractMkdir.java new file mode 100644 index 00000000000..26181f20385 --- /dev/null +++ b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractMkdir.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.gs.contract; + +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractMkdirTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; + +public class ITestGoogleContractMkdir extends AbstractContractMkdirTest { + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new GoogleContract(conf); + } + + @Override + public void testMkdirsDoesNotRemoveParentDirectories() { + // TODO: Enable this + ContractTestUtils.skip("Skipping the test. This will be enabled in a subsequent change"); + } + + @Override + public void testCreateDirWithExistingDir() { + // TODO: Enable this + ContractTestUtils.skip("Skipping the test. This will be enabled in a subsequent change"); + } + + @Override + public void testMkDirRmDir() { + // TODO: Enable this + ContractTestUtils.skip("Skipping the test. This will be enabled in a subsequent change"); + } + + @Override + public void testNoMkdirOverFile() { + // TODO: Enable this + ContractTestUtils.skip("Skipping the test. This will be enabled in a subsequent change"); + } + + @Override + public void testMkdirOverParentFile() { + // TODO: Enable this + ContractTestUtils.skip("Skipping the test. This will be enabled in a subsequent change"); + } +} diff --git a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/package-info.java b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/package-info.java new file mode 100644 index 00000000000..8806dc9f45b --- /dev/null +++ b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Google Cloud Storage Filesystem contract tests. + */ +package org.apache.hadoop.fs.gs.contract; \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org