This is an automated email from the ASF dual-hosted git repository. cnauroth pushed a commit to branch HADOOP-19343 in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit 1b12f77c8a21d0879ea9cb7f719235fadaeb0e53 Author: Arunkumar Chacko <aruncha...@google.com> AuthorDate: Wed Jun 18 04:21:07 2025 +0000 HADOOP-19343: Add support for open() and rename() Closes #7742 Signed-off-by: Chris Nauroth <cnaur...@apache.org> --- .../fs/gs/{ListFileOptions.java => Fadvise.java} | 19 +- .../hadoop/fs/gs/FileAccessPatternManager.java | 184 +++++++ .../apache/hadoop/fs/gs/GoogleCloudStorage.java | 289 ++++++++++ .../fs/gs/GoogleCloudStorageClientReadChannel.java | 609 +++++++++++++++++++++ .../hadoop/fs/gs/GoogleCloudStorageExceptions.java | 58 ++ .../hadoop/fs/gs/GoogleCloudStorageFileSystem.java | 322 ++++++++++- .../hadoop/fs/gs/GoogleHadoopFSInputStream.java | 187 +++++++ .../hadoop/fs/gs/GoogleHadoopFileSystem.java | 45 +- .../fs/gs/GoogleHadoopFileSystemConfiguration.java | 120 +++- .../src/main/java/org/apache/hadoop/fs/gs/Gs.java | 64 +++ .../org/apache/hadoop/fs/gs/ListFileOptions.java | 4 + .../fs/gs/contract/ITestGoogleContractOpen.java} | 21 +- .../fs/gs/contract/ITestGoogleContractRename.java | 43 ++ .../fs/gs/contract/ITestGoogleContractSeek.java} | 20 +- 14 files changed, 1928 insertions(+), 57 deletions(-) diff --git a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ListFileOptions.java b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/Fadvise.java similarity index 72% copy from hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ListFileOptions.java copy to hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/Fadvise.java index 2bc74c6fc21..6bc82324ddf 100644 --- a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ListFileOptions.java +++ b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/Fadvise.java @@ -1,3 +1,5 @@ +package org.apache.hadoop.fs.gs; + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -16,19 +18,6 @@ * limitations under the License. */ -package org.apache.hadoop.fs.gs; - -import javax.annotation.Nonnull; - -final class ListFileOptions { - static final ListFileOptions OBJECTFIELDS = new ListFileOptions("bucket,name,size,updated"); - private final String fields; - - private ListFileOptions(@Nonnull String fields) { - this.fields = fields; - } - - String getFields() { - return fields; - } +enum Fadvise { + RANDOM, SEQUENTIAL, AUTO, AUTO_RANDOM } diff --git a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/FileAccessPatternManager.java b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/FileAccessPatternManager.java new file mode 100644 index 00000000000..e653e2d4bf1 --- /dev/null +++ b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/FileAccessPatternManager.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.gs; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manages the access pattern of object being read from cloud storage. For adaptive fadvise + * configurations it computes the access pattern based on previous requests. + */ +class FileAccessPatternManager { + private static final Logger LOG = LoggerFactory.getLogger(FileAccessPatternManager.class); + private final StorageResourceId resourceId; + private final GoogleHadoopFileSystemConfiguration config; + private final Fadvise fadvise; + private boolean isPatternOverriden; + private boolean randomAccess; + // keeps track of any backward seek requested in lifecycle of InputStream + private boolean isBackwardSeekRequested = false; + // keeps track of any backward seek requested in lifecycle of InputStream + private boolean isForwardSeekRequested = false; + private long lastServedIndex = -1; + // Keeps track of distance between consecutive requests + private int consecutiveSequentialCount = 0; + + FileAccessPatternManager( + StorageResourceId resourceId, GoogleHadoopFileSystemConfiguration configuration) { + this.isPatternOverriden = false; + this.resourceId = resourceId; + this.config = configuration; + this.fadvise = config.getFadvise(); + this.randomAccess = fadvise == Fadvise.AUTO_RANDOM || fadvise == Fadvise.RANDOM; + } + + void updateLastServedIndex(long position) { + this.lastServedIndex = position; + } + + boolean shouldAdaptToRandomAccess() { + return randomAccess; + } + + void updateAccessPattern(long currentPosition) { + if (isPatternOverriden) { + LOG.trace("Will bypass computing access pattern as it's overriden for resource :{}", + resourceId); + return; + } + updateSeekFlags(currentPosition); + if (fadvise == Fadvise.AUTO_RANDOM) { + if (randomAccess) { + if (shouldAdaptToSequential(currentPosition)) { + unsetRandomAccess(); + } + } else { + if (shouldAdaptToRandomAccess(currentPosition)) { + setRandomAccess(); + } + } + } else if (fadvise == Fadvise.AUTO) { + if (shouldAdaptToRandomAccess(currentPosition)) { + setRandomAccess(); + } + } + } + + /** + * This provides a way to override the access isRandomPattern, once overridden it will not be + * recomputed for adaptive fadvise types. + * + * @param isRandomPattern, true, to override with random access else false + */ + void overrideAccessPattern(boolean isRandomPattern) { + this.isPatternOverriden = true; + this.randomAccess = isRandomPattern; + LOG.trace( + "Overriding the random access pattern to %s for fadvise:%s for resource: %s ", + isRandomPattern, fadvise, resourceId); + } + + private boolean shouldAdaptToSequential(long currentPosition) { + if (lastServedIndex != -1) { + long distance = currentPosition - lastServedIndex; + if (distance < 0 || distance > config.getInplaceSeekLimit()) { + consecutiveSequentialCount = 0; + } else { + consecutiveSequentialCount++; + } + } + + if (!shouldDetectSequentialAccess()) { + return false; + } + + if (consecutiveSequentialCount < config.getFadviseRequestTrackCount()) { + return false; + } + LOG.trace( + "Detected {} consecutive read request within distance threshold {} with fadvise: {} " + + "switching to sequential IO for '{}'", + consecutiveSequentialCount, + config.getInplaceSeekLimit(), + fadvise, + resourceId); + return true; + } + + private boolean shouldAdaptToRandomAccess(long currentPosition) { + if (!shouldDetectRandomAccess()) { + return false; + } + if (lastServedIndex == -1) { + return false; + } + + if (isBackwardOrForwardSeekRequested()) { + LOG.trace( + "Backward or forward seek requested, isBackwardSeek: {}, isForwardSeek:{} for '{}'", + isBackwardSeekRequested, isForwardSeekRequested, resourceId); + return true; + } + return false; + } + + private boolean shouldDetectSequentialAccess() { + return randomAccess + && !isBackwardOrForwardSeekRequested() + && consecutiveSequentialCount >= config.getFadviseRequestTrackCount() + && fadvise == Fadvise.AUTO_RANDOM; + } + + private boolean shouldDetectRandomAccess() { + return !randomAccess && (fadvise == Fadvise.AUTO || fadvise == Fadvise.AUTO_RANDOM); + } + + private void setRandomAccess() { + randomAccess = true; + } + + private void unsetRandomAccess() { + randomAccess = false; + } + + private boolean isBackwardOrForwardSeekRequested() { + return isBackwardSeekRequested || isForwardSeekRequested; + } + + private void updateSeekFlags(long currentPosition) { + if (lastServedIndex == -1) { + return; + } + + if (currentPosition < lastServedIndex) { + isBackwardSeekRequested = true; + LOG.trace( + "Detected backward read from {} to {} position, updating to backwardSeek for '{}'", + lastServedIndex, currentPosition, resourceId); + + } else if (lastServedIndex + config.getInplaceSeekLimit() < currentPosition) { + isForwardSeekRequested = true; + LOG.trace( + "Detected forward read from {} to {} position over {} threshold," + + " updated to forwardSeek for '{}'", + lastServedIndex, currentPosition, config.getInplaceSeekLimit(), resourceId); + } + } +} diff --git a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorage.java b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorage.java index d68eca6a8a5..89a86eef8ff 100644 --- a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorage.java +++ b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorage.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.*; import static org.apache.hadoop.thirdparty.com.google.common.base.Strings.isNullOrEmpty; import static java.lang.Math.toIntExact; +import static org.apache.hadoop.fs.gs.GoogleCloudStorageExceptions.createFileNotFoundException; import com.google.api.client.util.BackOff; import com.google.api.client.util.ExponentialBackOff; @@ -34,7 +35,9 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import java.io.FileNotFoundException; import java.io.IOException; +import java.nio.channels.SeekableByteChannel; import java.nio.channels.WritableByteChannel; import java.nio.file.FileAlreadyExistsException; import java.time.Duration; @@ -582,6 +585,292 @@ private List<Bucket> listBucketsInternal() throws IOException { return allBuckets; } + public SeekableByteChannel open(GoogleCloudStorageItemInfo itemInfo, + GoogleHadoopFileSystemConfiguration config) throws IOException { + LOG.trace("open({})", itemInfo); + checkNotNull(itemInfo, "itemInfo should not be null"); + + StorageResourceId resourceId = itemInfo.getResourceId(); + checkArgument( + resourceId.isStorageObject(), "Expected full StorageObject id, got %s", resourceId); + + return open(resourceId, itemInfo, config); + } + + private SeekableByteChannel open( + StorageResourceId resourceId, + GoogleCloudStorageItemInfo itemInfo, + GoogleHadoopFileSystemConfiguration config) + throws IOException { + return new GoogleCloudStorageClientReadChannel( + storage, + itemInfo == null ? getItemInfo(resourceId) : itemInfo, + config); + } + + public void move(Map<StorageResourceId, StorageResourceId> sourceToDestinationObjectsMap) + throws IOException { + validateMoveArguments(sourceToDestinationObjectsMap); + + if (sourceToDestinationObjectsMap.isEmpty()) { + return; + } + + for (Map.Entry<StorageResourceId, StorageResourceId> entry : + sourceToDestinationObjectsMap.entrySet()) { + StorageResourceId srcObject = entry.getKey(); + StorageResourceId dstObject = entry.getValue(); + // TODO: Do this concurrently + moveInternal( + srcObject.getBucketName(), + srcObject.getGenerationId(), + srcObject.getObjectName(), + dstObject.getGenerationId(), + dstObject.getObjectName()); + } + } + + private void moveInternal( + String srcBucketName, + long srcContentGeneration, + String srcObjectName, + long dstContentGeneration, + String dstObjectName) throws IOException { + Storage.MoveBlobRequest.Builder moveRequestBuilder = + createMoveRequestBuilder( + srcBucketName, + srcObjectName, + dstObjectName, + srcContentGeneration, + dstContentGeneration); + try { + String srcString = StringPaths.fromComponents(srcBucketName, srcObjectName); + String dstString = StringPaths.fromComponents(srcBucketName, dstObjectName); + + Blob movedBlob = storage.moveBlob(moveRequestBuilder.build()); + if (movedBlob != null) { + LOG.trace("Successfully moved {} to {}", srcString, dstString); + } + } catch (StorageException e) { + if (ErrorTypeExtractor.getErrorType(e) == ErrorTypeExtractor.ErrorType.NOT_FOUND) { + throw createFileNotFoundException(srcBucketName, srcObjectName, new IOException(e)); + } else { + throw + new IOException( + String.format( + "Error moving '%s'", + StringPaths.fromComponents(srcBucketName, srcObjectName)), + e); + } + } + } + + /** Creates a builder for a blob move request. */ + private Storage.MoveBlobRequest.Builder createMoveRequestBuilder( + String srcBucketName, + String srcObjectName, + String dstObjectName, + long srcContentGeneration, + long dstContentGeneration) { + + Storage.MoveBlobRequest.Builder moveRequestBuilder = + Storage.MoveBlobRequest.newBuilder().setSource(BlobId.of(srcBucketName, srcObjectName)); + moveRequestBuilder.setTarget(BlobId.of(srcBucketName, dstObjectName)); + + List<Storage.BlobTargetOption> blobTargetOptions = new ArrayList<>(); + List<Storage.BlobSourceOption> blobSourceOptions = new ArrayList<>(); + + if (srcContentGeneration != StorageResourceId.UNKNOWN_GENERATION_ID) { + blobSourceOptions.add(Storage.BlobSourceOption.generationMatch(srcContentGeneration)); + } + + if (dstContentGeneration != StorageResourceId.UNKNOWN_GENERATION_ID) { + blobTargetOptions.add(Storage.BlobTargetOption.generationMatch(dstContentGeneration)); + } + + // TODO: Add encryption support + + moveRequestBuilder.setSourceOptions(blobSourceOptions); + moveRequestBuilder.setTargetOptions(blobTargetOptions); + + return moveRequestBuilder; + } + + /** + * Validates basic argument constraints like non-null, non-empty Strings, using {@code + * Preconditions} in addition to checking for src/dst bucket equality. + */ + public static void validateMoveArguments( + Map<StorageResourceId, StorageResourceId> sourceToDestinationObjectsMap) throws IOException { + checkNotNull(sourceToDestinationObjectsMap, "srcObjects must not be null"); + + if (sourceToDestinationObjectsMap.isEmpty()) { + return; + } + + for (Map.Entry<StorageResourceId, StorageResourceId> entry : + sourceToDestinationObjectsMap.entrySet()) { + StorageResourceId source = entry.getKey(); + StorageResourceId destination = entry.getValue(); + String srcBucketName = source.getBucketName(); + String dstBucketName = destination.getBucketName(); + // Avoid move across buckets. + if (!srcBucketName.equals(dstBucketName)) { + throw new UnsupportedOperationException( + "This operation is not supported across two different buckets."); + } + checkArgument( + !isNullOrEmpty(source.getObjectName()), "srcObjectName must not be null or empty"); + checkArgument( + !isNullOrEmpty(destination.getObjectName()), "dstObjectName must not be null or empty"); + if (srcBucketName.equals(dstBucketName) + && source.getObjectName().equals(destination.getObjectName())) { + throw new IllegalArgumentException( + String.format( + "Move destination must be different from source for %s.", + StringPaths.fromComponents(srcBucketName, source.getObjectName()))); + } + } + } + + void copy(Map<StorageResourceId, StorageResourceId> sourceToDestinationObjectsMap) + throws IOException { + validateCopyArguments(sourceToDestinationObjectsMap, this); + + if (sourceToDestinationObjectsMap.isEmpty()) { + return; + } + + for (Map.Entry<StorageResourceId, StorageResourceId> entry : + sourceToDestinationObjectsMap.entrySet()) { + StorageResourceId srcObject = entry.getKey(); + StorageResourceId dstObject = entry.getValue(); + // TODO: Do this concurrently + copyInternal( + srcObject.getBucketName(), + srcObject.getObjectName(), + dstObject.getGenerationId(), + dstObject.getBucketName(), + dstObject.getObjectName()); + } + } + + private void copyInternal( + String srcBucketName, + String srcObjectName, + long dstContentGeneration, + String dstBucketName, + String dstObjectName) throws IOException { + Storage.CopyRequest.Builder copyRequestBuilder = + Storage.CopyRequest.newBuilder().setSource(BlobId.of(srcBucketName, srcObjectName)); + if (dstContentGeneration != StorageResourceId.UNKNOWN_GENERATION_ID) { + copyRequestBuilder.setTarget( + BlobId.of(dstBucketName, dstObjectName), + Storage.BlobTargetOption.generationMatch(dstContentGeneration)); + } else { + copyRequestBuilder.setTarget(BlobId.of(dstBucketName, dstObjectName)); + } + + // TODO: Add support for encryption key + if (configuration.getMaxRewriteChunkSize() > 0) { + copyRequestBuilder.setMegabytesCopiedPerChunk( + // Convert raw byte size into Mib. + configuration.getMaxRewriteChunkSize() / (1024 * 1024)); + } + + String srcString = StringPaths.fromComponents(srcBucketName, srcObjectName); + String dstString = StringPaths.fromComponents(dstBucketName, dstObjectName); + + try { + CopyWriter copyWriter = storage.copy(copyRequestBuilder.build()); + while (!copyWriter.isDone()) { + copyWriter.copyChunk(); + LOG.trace( + "Copy ({} to {}) did not complete. Resuming...", srcString, dstString); + } + LOG.trace("Successfully copied {} to {}", srcString, dstString); + } catch (StorageException e) { + if (ErrorTypeExtractor.getErrorType(e) == ErrorTypeExtractor.ErrorType.NOT_FOUND) { + throw createFileNotFoundException(srcBucketName, srcObjectName, new IOException(e)); + } else { + throw new IOException(String.format("copy(%s->%s) failed.", srcString, dstString), e); + } + } + } + + public static void validateCopyArguments( + Map<StorageResourceId, StorageResourceId> sourceToDestinationObjectsMap, + GoogleCloudStorage gcsImpl) + throws IOException { + checkNotNull(sourceToDestinationObjectsMap, "srcObjects must not be null"); + + if (sourceToDestinationObjectsMap.isEmpty()) { + return; + } + + Map<StorageResourceId, GoogleCloudStorageItemInfo> bucketInfoCache = new HashMap<>(); + + for (Map.Entry<StorageResourceId, StorageResourceId> entry : + sourceToDestinationObjectsMap.entrySet()) { + StorageResourceId source = entry.getKey(); + StorageResourceId destination = entry.getValue(); + String srcBucketName = source.getBucketName(); + String dstBucketName = destination.getBucketName(); + // Avoid copy across locations or storage classes. + if (!srcBucketName.equals(dstBucketName)) { + StorageResourceId srcBucketResourceId = new StorageResourceId(srcBucketName); + GoogleCloudStorageItemInfo srcBucketInfo = + getGoogleCloudStorageItemInfo(gcsImpl, bucketInfoCache, srcBucketResourceId); + if (!srcBucketInfo.exists()) { + throw new FileNotFoundException("Bucket not found: " + srcBucketName); + } + + StorageResourceId dstBucketResourceId = new StorageResourceId(dstBucketName); + GoogleCloudStorageItemInfo dstBucketInfo = + getGoogleCloudStorageItemInfo(gcsImpl, bucketInfoCache, dstBucketResourceId); + if (!dstBucketInfo.exists()) { + throw new FileNotFoundException("Bucket not found: " + dstBucketName); + } + + // TODO: Restrict this only when copy-with-rewrite is enabled + if (!srcBucketInfo.getLocation().equals(dstBucketInfo.getLocation())) { + throw new UnsupportedOperationException( + "This operation is not supported across two different storage locations."); + } + + if (!srcBucketInfo.getStorageClass().equals(dstBucketInfo.getStorageClass())) { + throw new UnsupportedOperationException( + "This operation is not supported across two different storage classes."); + } + } + checkArgument( + !isNullOrEmpty(source.getObjectName()), "srcObjectName must not be null or empty"); + checkArgument( + !isNullOrEmpty(destination.getObjectName()), "dstObjectName must not be null or empty"); + if (srcBucketName.equals(dstBucketName) + && source.getObjectName().equals(destination.getObjectName())) { + throw new IllegalArgumentException( + String.format( + "Copy destination must be different from source for %s.", + StringPaths.fromComponents(srcBucketName, source.getObjectName()))); + } + } + } + + private static GoogleCloudStorageItemInfo getGoogleCloudStorageItemInfo( + GoogleCloudStorage gcsImpl, + Map<StorageResourceId, GoogleCloudStorageItemInfo> bucketInfoCache, + StorageResourceId resourceId) + throws IOException { + GoogleCloudStorageItemInfo storageItemInfo = bucketInfoCache.get(resourceId); + if (storageItemInfo != null) { + return storageItemInfo; + } + storageItemInfo = gcsImpl.getItemInfo(resourceId); + bucketInfoCache.put(resourceId, storageItemInfo); + return storageItemInfo; + } + // Helper class to capture the results of list operation. private class ListOperationResult { private final Map<String, Blob> prefixes = new HashMap<>(); diff --git a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageClientReadChannel.java b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageClientReadChannel.java new file mode 100644 index 00000000000..afe16c66701 --- /dev/null +++ b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageClientReadChannel.java @@ -0,0 +1,609 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.gs; + +import static org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkArgument; +import static org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkNotNull; +import static org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkState; +import static org.apache.hadoop.thirdparty.com.google.common.base.Strings.nullToEmpty; +import static java.lang.Math.max; +import static java.lang.Math.min; +import static java.lang.Math.toIntExact; +import static org.apache.hadoop.fs.gs.GoogleCloudStorageExceptions.createFileNotFoundException; + +import com.google.cloud.ReadChannel; +import com.google.cloud.storage.BlobId; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.Storage.BlobSourceOption; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.EOFException; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.SeekableByteChannel; +import java.util.ArrayList; +import java.util.List; +import javax.annotation.Nullable; + +/** Provides seekable read access to GCS via java-storage library. */ +class GoogleCloudStorageClientReadChannel implements SeekableByteChannel { + private static final Logger LOG = + LoggerFactory.getLogger(GoogleCloudStorageClientReadChannel.class); + private static final String GZIP_ENCODING = "gzip"; + + private final StorageResourceId resourceId; + private final Storage storage; + private final GoogleHadoopFileSystemConfiguration config; + + // The size of this object generation, in bytes. + private long objectSize; + private ContentReadChannel contentReadChannel; + private boolean gzipEncoded = false; + private boolean open = true; + + // Current position in this channel, it could be different from contentChannelCurrentPosition if + // position(long) method calls were made without calls to read(ByteBuffer) method. + private long currentPosition = 0; + + GoogleCloudStorageClientReadChannel( + Storage storage, + GoogleCloudStorageItemInfo itemInfo, + GoogleHadoopFileSystemConfiguration config) + throws IOException { + validate(itemInfo); + this.storage = storage; + this.resourceId = + new StorageResourceId( + itemInfo.getBucketName(), itemInfo.getObjectName(), itemInfo.getContentGeneration()); + this.contentReadChannel = new ContentReadChannel(config, resourceId); + initMetadata(itemInfo.getContentEncoding(), itemInfo.getSize()); + this.config = config; + } + + protected void initMetadata(@Nullable String encoding, long sizeFromMetadata) throws IOException { + gzipEncoded = nullToEmpty(encoding).contains(GZIP_ENCODING); + if (gzipEncoded && !config.isGzipEncodingSupportEnabled()) { + throw new IOException( + "Cannot read GZIP encoded files - content encoding support is disabled."); + } + objectSize = gzipEncoded ? Long.MAX_VALUE : sizeFromMetadata; + } + + @Override + public int read(ByteBuffer dst) throws IOException { + throwIfNotOpen(); + + // Don't try to read if the buffer has no space. + if (dst.remaining() == 0) { + return 0; + } + LOG.trace( + "Reading {} bytes at {} position from '{}'", dst.remaining(), currentPosition, resourceId); + if (currentPosition == objectSize) { + return -1; + } + return contentReadChannel.readContent(dst); + } + + @Override + public int write(ByteBuffer src) throws IOException { + throw new UnsupportedOperationException("Cannot mutate read-only channel"); + } + + @Override + public long position() throws IOException { + return currentPosition; + } + + /** + * Sets this channel's position. + * + * <p>This method will throw an exception if {@code newPosition} is greater than object size, + * which contradicts {@link SeekableByteChannel#position(long) SeekableByteChannel} contract. + * TODO(user): decide if this needs to be fixed. + * + * @param newPosition the new position, counting the number of bytes from the beginning. + * @return this channel instance + * @throws FileNotFoundException if the underlying object does not exist. + * @throws IOException on IO error + */ + @Override + public SeekableByteChannel position(long newPosition) throws IOException { + throwIfNotOpen(); + + if (newPosition == currentPosition) { + return this; + } + + validatePosition(newPosition); + LOG.trace( + "Seek from {} to {} position for '{}'", currentPosition, newPosition, resourceId); + currentPosition = newPosition; + return this; + } + + @Override + public long size() throws IOException { + return objectSize; + } + + @Override + public SeekableByteChannel truncate(long size) throws IOException { + throw new UnsupportedOperationException("Cannot mutate read-only channel"); + } + + @Override + public boolean isOpen() { + return open; + } + + @Override + public void close() throws IOException { + if (open) { + try { + LOG.trace("Closing channel for '{}'", resourceId); + contentReadChannel.closeContentChannel(); + } catch (Exception e) { + throw new IOException( + String.format("Exception occurred while closing channel '%s'", resourceId), e); + } finally { + contentReadChannel = null; + open = false; + } + } + } + + /** + * This class own the responsibility of opening up contentChannel. It also implements the Fadvise, + * which helps in deciding the boundaries of content channel being opened and also caching the + * footer of an object. + */ + private class ContentReadChannel { + + // Size of buffer to allocate for skipping bytes in-place when performing in-place seeks. + private static final int SKIP_BUFFER_SIZE = 8192; + private final BlobId blobId; + + // This is the actual current position in `contentChannel` from where read can happen. + // This remains unchanged of position(long) method call. + private long contentChannelCurrentPosition = -1; + private long contentChannelEnd = -1; + // Prefetched footer content. + private byte[] footerContent; + // Used as scratch space when reading bytes just to discard them when trying to perform small + // in-place seeks. + private byte[] skipBuffer = null; + private ReadableByteChannel byteChannel = null; + private final FileAccessPatternManager fileAccessManager; + + ContentReadChannel( + GoogleHadoopFileSystemConfiguration config, StorageResourceId resourceId) { + this.blobId = + BlobId.of( + resourceId.getBucketName(), resourceId.getObjectName(), resourceId.getGenerationId()); + this.fileAccessManager = new FileAccessPatternManager(resourceId, config); + if (gzipEncoded) { + fileAccessManager.overrideAccessPattern(false); + } + } + + int readContent(ByteBuffer dst) throws IOException { + + performPendingSeeks(); + + checkState( + contentChannelCurrentPosition == currentPosition || byteChannel == null, + "contentChannelCurrentPosition (%s) should be equal to currentPosition " + + "(%s) after lazy seek, if channel is open", + contentChannelCurrentPosition, + currentPosition); + + int totalBytesRead = 0; + // We read from a streaming source. We may not get all the bytes we asked for + // in the first read. Therefore, loop till we either read the required number of + // bytes or we reach end-of-stream. + while (dst.hasRemaining()) { + int remainingBeforeRead = dst.remaining(); + try { + if (byteChannel == null) { + byteChannel = openByteChannel(dst.remaining()); + // We adjust the start index of content channel in following cases + // 1. request range is in footer boundaries --> request the whole footer + // 2. requested content is gzip encoded -> request always from start of file. + // Case(1) is handled with reading and caching the extra read bytes, for all other cases + // we need to skip all the unrequested bytes before start reading from current position. + if (currentPosition > contentChannelCurrentPosition) { + skipInPlace(); + } + // making sure that currentPosition is in alignment with currentReadPosition before + // actual read starts to avoid read discrepancies. + checkState( + contentChannelCurrentPosition == currentPosition, + "position of read offset isn't in alignment with channel's read offset"); + } + int bytesRead = byteChannel.read(dst); + + /* + As we are using the zero copy implementation of byteChannel, + it can return even zero bytes, + while reading, + we should not treat it as an error scenario anymore. + */ + if (bytesRead == 0) { + LOG.trace( + "Read {} from storage-client's byte channel at position: {} with channel " + + "ending at: {} for resourceId: {} of size: {}", + bytesRead, currentPosition, contentChannelEnd, resourceId, objectSize); + } + + if (bytesRead < 0) { + // Because we don't know decompressed object size for gzip-encoded objects, + // assume that this is an object end. + if (gzipEncoded) { + objectSize = currentPosition; + contentChannelEnd = currentPosition; + } + + if (currentPosition != contentChannelEnd && currentPosition != objectSize) { + throw new IOException( + String.format( + "Received end of stream result before all requestedBytes were received;" + + "EndOf stream signal received at offset: %d where as stream was " + + "suppose to end at: %d for resource: %s of size: %d", + currentPosition, contentChannelEnd, resourceId, objectSize)); + } + // If we have reached an end of a contentChannel but not an end of an object. + // then close contentChannel and continue reading an object if necessary. + if (contentChannelEnd != objectSize && currentPosition == contentChannelEnd) { + closeContentChannel(); + continue; + } else { + break; + } + } + totalBytesRead += bytesRead; + currentPosition += bytesRead; + contentChannelCurrentPosition += bytesRead; + checkState( + contentChannelCurrentPosition == currentPosition, + "contentChannelPosition (%s) should be equal to currentPosition (%s)" + + " after successful read", + contentChannelCurrentPosition, + currentPosition); + } catch (Exception e) { + int partialBytes = partiallyReadBytes(remainingBeforeRead, dst); + currentPosition += partialBytes; + contentChannelCurrentPosition += partialBytes; + LOG.trace( + "Closing contentChannel after {} exception for '{}'.", e.getMessage(), resourceId); + closeContentChannel(); + throw convertError(e); + } + } + return totalBytesRead; + } + + private int partiallyReadBytes(int remainingBeforeRead, ByteBuffer dst) { + int partialReadBytes = 0; + if (remainingBeforeRead != dst.remaining()) { + partialReadBytes = remainingBeforeRead - dst.remaining(); + } + return partialReadBytes; + } + + private ReadableByteChannel openByteChannel(long bytesToRead) throws IOException { + checkArgument( + bytesToRead > 0, "bytesToRead should be greater than 0, but was %s", bytesToRead); + checkState( + byteChannel == null && contentChannelEnd < 0, + "contentChannel and contentChannelEnd should be not initialized yet for '%s'", + resourceId); + + if (footerContent != null && currentPosition >= objectSize - footerContent.length) { + return serveFooterContent(); + } + + // Should be updated only if content is not served from cached footer + fileAccessManager.updateAccessPattern(currentPosition); + + setChannelBoundaries(bytesToRead); + + ReadableByteChannel readableByteChannel = + getStorageReadChannel(contentChannelCurrentPosition, contentChannelEnd); + + if (contentChannelEnd == objectSize + && (contentChannelEnd - contentChannelCurrentPosition) + <= config.getMinRangeRequestSize()) { + + if (footerContent == null) { + cacheFooter(readableByteChannel); + } + return serveFooterContent(); + } + return readableByteChannel; + } + + private void setChannelBoundaries(long bytesToRead) { + contentChannelCurrentPosition = getRangeRequestStart(); + contentChannelEnd = getRangeRequestEnd(contentChannelCurrentPosition, bytesToRead); + checkState( + contentChannelEnd >= contentChannelCurrentPosition, + String.format( + "Start position should be <= endPosition startPosition:%d, endPosition: %d", + contentChannelCurrentPosition, contentChannelEnd)); + } + + private void cacheFooter(ReadableByteChannel readableByteChannel) throws IOException { + int footerSize = toIntExact(objectSize - contentChannelCurrentPosition); + footerContent = new byte[footerSize]; + try (InputStream footerStream = Channels.newInputStream(readableByteChannel)) { + int totalBytesRead = 0; + int bytesRead; + do { + bytesRead = footerStream.read(footerContent, totalBytesRead, footerSize - totalBytesRead); + if (bytesRead >= 0) { + totalBytesRead += bytesRead; + } + } while (bytesRead >= 0 && totalBytesRead < footerSize); + checkState( + bytesRead >= 0, + "footerStream shouldn't be empty before reading the footer of size %s, " + + "totalBytesRead %s, read via last call %s, for '%s'", + footerSize, + totalBytesRead, + bytesRead, + resourceId); + checkState( + totalBytesRead == footerSize, + "totalBytesRead (%s) should equal footerSize (%s) for '%s'", + totalBytesRead, + footerSize, + resourceId); + } catch (Exception e) { + footerContent = null; + throw e; + } + LOG.trace("Prefetched {} bytes footer for '{}'", footerContent.length, resourceId); + } + + private ReadableByteChannel serveFooterContent() { + contentChannelCurrentPosition = currentPosition; + int offset = toIntExact(currentPosition - (objectSize - footerContent.length)); + int length = footerContent.length - offset; + LOG.trace( + "Opened channel (prefetched footer) from {} position for '{}'", + currentPosition, resourceId); + return Channels.newChannel(new ByteArrayInputStream(footerContent, offset, length)); + } + + private long getRangeRequestStart() { + if (gzipEncoded) { + return 0; + } + if (config.getFadvise() != Fadvise.SEQUENTIAL + && isFooterRead() + && !config.isReadExactRequestedBytesEnabled()) { + // Prefetch footer and adjust start position to footerStart. + return max(0, objectSize - config.getMinRangeRequestSize()); + } + return currentPosition; + } + + private long getRangeRequestEnd(long startPosition, long bytesToRead) { + // Always read gzip-encoded files till the end - they do not support range reads. + if (gzipEncoded) { + return objectSize; + } + long endPosition = objectSize; + if (fileAccessManager.shouldAdaptToRandomAccess()) { + // opening a channel for whole object doesn't make sense as anyhow it will not be utilized + // for further reads. + endPosition = startPosition + max(bytesToRead, config.getMinRangeRequestSize()); + } else { + if (config.getFadvise() == Fadvise.AUTO_RANDOM) { + endPosition = min(startPosition + config.getBlockSize(), objectSize); + } + } + + if (footerContent != null) { + // If footer is cached open just till footerStart. + // Remaining content ill be served from cached footer itself. + endPosition = min(endPosition, objectSize - footerContent.length); + } + return endPosition; + } + + void closeContentChannel() { + if (byteChannel != null) { + LOG.trace("Closing internal contentChannel for '{}'", resourceId); + try { + byteChannel.close(); + } catch (Exception e) { + LOG.trace( + "Got an exception on contentChannel.close() for '{}'; ignoring it.", resourceId, e); + } finally { + byteChannel = null; + fileAccessManager.updateLastServedIndex(contentChannelCurrentPosition); + reset(); + } + } + } + + private void reset() { + checkState(byteChannel == null, "contentChannel should be null for '%s'", resourceId); + contentChannelCurrentPosition = -1; + contentChannelEnd = -1; + } + + private boolean isInRangeSeek() { + long seekDistance = currentPosition - contentChannelCurrentPosition; + if (byteChannel != null + && seekDistance > 0 + // for gzip encoded content always seek in place + && (gzipEncoded || seekDistance <= config.getInplaceSeekLimit()) + && currentPosition < contentChannelEnd) { + return true; + } + return false; + } + + private void skipInPlace() { + if (skipBuffer == null) { + skipBuffer = new byte[SKIP_BUFFER_SIZE]; + } + long seekDistance = currentPosition - contentChannelCurrentPosition; + while (seekDistance > 0 && byteChannel != null) { + try { + int bufferSize = toIntExact(min(skipBuffer.length, seekDistance)); + int bytesRead = byteChannel.read(ByteBuffer.wrap(skipBuffer, 0, bufferSize)); + if (bytesRead < 0) { + LOG.info( + "Somehow read {} bytes trying to skip {} bytes to seek to position {}, size: {}", + bytesRead, seekDistance, currentPosition, objectSize); + closeContentChannel(); + } else { + seekDistance -= bytesRead; + contentChannelCurrentPosition += bytesRead; + } + } catch (Exception e) { + LOG.info( + "Got an IO exception on contentChannel.read(), a lazy-seek will be pending for '{}'", + resourceId, e); + closeContentChannel(); + } + } + checkState( + byteChannel == null || contentChannelCurrentPosition == currentPosition, + "contentChannelPosition (%s) should be equal to currentPosition (%s)" + + " after successful in-place skip", + contentChannelCurrentPosition, + currentPosition); + } + + private void performPendingSeeks() { + + // Return quickly if there is no pending seek operation, i.e. position didn't change. + if (currentPosition == contentChannelCurrentPosition && byteChannel != null) { + return; + } + + LOG.trace( + "Performing lazySeek from {} to {} position '{}'", + contentChannelCurrentPosition, currentPosition, resourceId); + + if (isInRangeSeek()) { + skipInPlace(); + } else { + // close existing contentChannel as requested bytes can't be served from current + // contentChannel; + closeContentChannel(); + } + } + + private ReadableByteChannel getStorageReadChannel(long seek, long limit) throws IOException { + ReadChannel readChannel = storage.reader(blobId, generateReadOptions()); + try { + readChannel.seek(seek); + readChannel.limit(limit); + // bypass the storage-client caching layer hence eliminates the need to maintain a copy of + // chunk + readChannel.setChunkSize(0); + return readChannel; + } catch (Exception e) { + throw new IOException( + String.format( + "Unable to update the boundaries/Range of contentChannel %s", + resourceId.toString()), + e); + } + } + + private BlobSourceOption[] generateReadOptions() { + List<BlobSourceOption> blobReadOptions = new ArrayList<>(); + // To get decoded content + blobReadOptions.add(BlobSourceOption.shouldReturnRawInputStream(false)); + + if (blobId.getGeneration() != null) { + blobReadOptions.add(BlobSourceOption.generationMatch(blobId.getGeneration())); + } + + // TODO: Add support for encryptionKey + return blobReadOptions.toArray(new BlobSourceOption[blobReadOptions.size()]); + } + + private boolean isFooterRead() { + return objectSize - currentPosition <= config.getMinRangeRequestSize(); + } + } + + private static void validate(GoogleCloudStorageItemInfo itemInfo) throws IOException { + checkNotNull(itemInfo, "itemInfo cannot be null"); + StorageResourceId resourceId = itemInfo.getResourceId(); + checkArgument( + resourceId.isStorageObject(), "Can not open a non-file object for read: %s", resourceId); + if (!itemInfo.exists()) { + throw new FileNotFoundException(String.format("Item not found: %s", resourceId)); + } + } + + private IOException convertError(Exception error) { + String msg = String.format("Error reading '%s'", resourceId); + switch (ErrorTypeExtractor.getErrorType(error)) { + case NOT_FOUND: + return createFileNotFoundException( + resourceId.getBucketName(), resourceId.getObjectName(), new IOException(msg, error)); + case OUT_OF_RANGE: + return (IOException) new EOFException(msg).initCause(error); + default: + return new IOException(msg, error); + } + } + + /** Validates that the given position is valid for this channel. */ + private void validatePosition(long position) throws IOException { + if (position < 0) { + throw new EOFException( + String.format( + "Invalid seek offset: position value (%d) must be >= 0 for '%s'", + position, resourceId)); + } + + if (objectSize >= 0 && position >= objectSize) { + throw new EOFException( + String.format( + "Invalid seek offset: position value (%d) must be between 0 and %d for '%s'", + position, objectSize, resourceId)); + } + } + + /** Throws if this channel is not currently open. */ + private void throwIfNotOpen() throws IOException { + if (!isOpen()) { + throw new ClosedChannelException(); + } + } +} diff --git a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageExceptions.java b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageExceptions.java new file mode 100644 index 00000000000..95f0e41617c --- /dev/null +++ b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageExceptions.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.gs; + +import static org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkArgument; +import static org.apache.hadoop.thirdparty.com.google.common.base.Strings.isNullOrEmpty; +import static org.apache.hadoop.thirdparty.com.google.common.base.Strings.nullToEmpty; + +import java.io.FileNotFoundException; +import java.io.IOException; +import javax.annotation.Nullable; + +/** + * Miscellaneous helper methods for standardizing the types of exceptions thrown by the various + * GCS-based FileSystems. + */ +final class GoogleCloudStorageExceptions { + + private GoogleCloudStorageExceptions() {} + + /** Creates FileNotFoundException with suitable message for a GCS bucket or object. */ + static FileNotFoundException createFileNotFoundException( + String bucketName, String objectName, @Nullable IOException cause) { + checkArgument(!isNullOrEmpty(bucketName), "bucketName must not be null or empty"); + FileNotFoundException fileNotFoundException = + new FileNotFoundException( + String.format( + "Item not found: '%s'. Note, it is possible that the live version" + + " is still available but the requested generation is deleted.", + StringPaths.fromComponents(bucketName, nullToEmpty(objectName)))); + if (cause != null) { + fileNotFoundException.initCause(cause); + } + return fileNotFoundException; + } + + static FileNotFoundException createFileNotFoundException( + StorageResourceId resourceId, @Nullable IOException cause) { + return createFileNotFoundException( + resourceId.getBucketName(), resourceId.getObjectName(), cause); + } +} diff --git a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageFileSystem.java b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageFileSystem.java index aa1617e4da6..2b0c238eb02 100644 --- a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageFileSystem.java +++ b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageFileSystem.java @@ -26,6 +26,7 @@ import com.google.auth.Credentials; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList; +import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap; import org.apache.hadoop.thirdparty.com.google.common.collect.Iterables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,13 +34,20 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; +import java.nio.channels.SeekableByteChannel; import java.nio.channels.WritableByteChannel; import java.nio.file.DirectoryNotEmptyException; import java.nio.file.FileAlreadyExistsException; import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.TreeMap; +import java.util.regex.Pattern; +import javax.annotation.Nullable; /** * Provides FS semantics over GCS based on Objects API. @@ -73,6 +81,7 @@ class GoogleCloudStorageFileSystem { // URI of the root path. static final URI GCSROOT = URI.create(SCHEME + ":/"); + private final GoogleHadoopFileSystemConfiguration configuration; // GCS access instance. private GoogleCloudStorage gcs; @@ -87,6 +96,7 @@ private static GoogleCloudStorage createCloudStorage( GoogleCloudStorageFileSystem(final GoogleHadoopFileSystemConfiguration configuration, final Credentials credentials) throws IOException { + this.configuration = configuration; gcs = createCloudStorage(configuration, credentials); } @@ -157,9 +167,7 @@ private GoogleCloudStorageItemInfo getFileInfoInternal( resourceId.getBucketName(), resourceId.getObjectName(), GET_FILE_INFO_LIST_OPTIONS); - LOG.trace("List for getMetadata returned {}. {}", listDirResult.size(), listDirResult); if (!listDirResult.isEmpty()) { - LOG.trace("Get metadata for directory returned non empty {}", listDirResult); return GoogleCloudStorageItemInfo.createInferredDirectory(resourceId.toDirectoryId()); } } @@ -371,4 +379,314 @@ public List<FileInfo> listFileInfo(URI path, ListFileOptions listOptions) throws fileInfos.sort(FILE_INFO_PATH_COMPARATOR); return fileInfos; } + + FileInfo getFileInfoObject(URI path) throws IOException { + checkArgument(path != null, "path must not be null"); + StorageResourceId resourceId = StorageResourceId.fromUriPath(path, true); + checkArgument( + !resourceId.isDirectory(), + String.format( + "path must be an object and not a directory, path: %s, resourceId: %s", + path, resourceId)); + FileInfo fileInfo = FileInfo.fromItemInfo(gcs.getItemInfo(resourceId)); + LOG.trace("getFileInfoObject(path: {}): {}", path, fileInfo); + return fileInfo; + } + + SeekableByteChannel open(FileInfo fileInfo, GoogleHadoopFileSystemConfiguration config) + throws IOException { + checkNotNull(fileInfo, "fileInfo should not be null"); + checkArgument( + !fileInfo.isDirectory(), "Cannot open a directory for reading: %s", fileInfo.getPath()); + + return gcs.open(fileInfo.getItemInfo(), config); + } + + void rename(URI src, URI dst) throws IOException { + LOG.trace("rename(src: {}, dst: {})", src, dst); + checkNotNull(src); + checkNotNull(dst); + checkArgument(!src.equals(GCSROOT), "Root path cannot be renamed."); + + // Parent of the destination path. + URI dstParent = UriPaths.getParentPath(dst); + + // Obtain info on source, destination and destination-parent. + List<URI> paths = new ArrayList<>(); + paths.add(src); + paths.add(dst); + if (dstParent != null) { + // dstParent is null if dst is GCS_ROOT. + paths.add(dstParent); + } + List<FileInfo> fileInfos = getFileInfos(paths); + FileInfo srcInfo = fileInfos.get(0); + FileInfo dstInfo = fileInfos.get(1); + FileInfo dstParentInfo = dstParent == null ? null : fileInfos.get(2); + + // Throw if the source file does not exist. + if (!srcInfo.exists()) { + throw new FileNotFoundException("Item not found: " + src); + } + + // Make sure paths match what getFileInfo() returned (it can add / at the end). + src = srcInfo.getPath(); + dst = getDstUri(srcInfo, dstInfo, dstParentInfo); + + // if src and dst are equal then do nothing + if (src.equals(dst)) { + return; + } + + if (srcInfo.isDirectory()) { + renameDirectoryInternal(srcInfo, dst); + } else { + renameObject(src, dst, srcInfo); + } + } + + private void renameObject(URI src, URI dst, FileInfo srcInfo) throws IOException { + StorageResourceId srcResourceId = + StorageResourceId.fromUriPath(src, /* allowEmptyObjectName= */ true); + StorageResourceId dstResourceId = StorageResourceId.fromUriPath( + dst, + /* allowEmptyObjectName= */ true, + /* generationId= */ 0L); + + if (srcResourceId.getBucketName().equals(dstResourceId.getBucketName())) { + gcs.move( + ImmutableMap.of( + new StorageResourceId( + srcInfo.getItemInfo().getBucketName(), + srcInfo.getItemInfo().getObjectName(), + srcInfo.getItemInfo().getContentGeneration()), + dstResourceId)); + } else { + gcs.copy(ImmutableMap.of(srcResourceId, dstResourceId)); + + gcs.deleteObjects( + ImmutableList.of( + new StorageResourceId( + srcInfo.getItemInfo().getBucketName(), + srcInfo.getItemInfo().getObjectName(), + srcInfo.getItemInfo().getContentGeneration()))); + } + } + + /** + * Renames given directory without checking any parameters. + * + * <p>GCS does not support atomic renames therefore rename is implemented as copying source + * metadata to destination and then deleting source metadata. Note that only the metadata is + * copied and not the content of any file. + */ + private void renameDirectoryInternal(FileInfo srcInfo, URI dst) throws IOException { + checkArgument(srcInfo.isDirectory(), "'%s' should be a directory", srcInfo); + checkArgument(dst.toString().endsWith(PATH_DELIMITER), "'%s' should be a directory", dst); + + URI src = srcInfo.getPath(); + + // Mapping from each src to its respective dst. + // Sort src items so that parent directories appear before their children. + // That allows us to copy parent directories before we copy their children. + Map<FileInfo, URI> srcToDstItemNames = new TreeMap<>(FILE_INFO_PATH_COMPARATOR); + Map<FileInfo, URI> srcToDstMarkerItemNames = new TreeMap<>(FILE_INFO_PATH_COMPARATOR); + + // List of individual paths to rename; + // we will try to carry out the copies in this list's order. + List<FileInfo> srcItemInfos = + listFileInfoForPrefix(src, ListFileOptions.DELETE_RENAME_LIST_OPTIONS); + + // Create a list of sub-items to copy. + Pattern markerFilePattern = configuration.getMarkerFilePattern(); + String prefix = src.toString(); + for (FileInfo srcItemInfo : srcItemInfos) { + String relativeItemName = srcItemInfo.getPath().toString().substring(prefix.length()); + URI dstItemName = dst.resolve(relativeItemName); + if (markerFilePattern != null && markerFilePattern.matcher(relativeItemName).matches()) { + srcToDstMarkerItemNames.put(srcItemInfo, dstItemName); + } else { + srcToDstItemNames.put(srcItemInfo, dstItemName); + } + } + + StorageResourceId srcResourceId = + StorageResourceId.fromUriPath(src, /* allowEmptyObjectName= */ true); + StorageResourceId dstResourceId = + StorageResourceId.fromUriPath( + dst, /* allowEmptyObjectName= */ true, /* generationId= */ 0L); + if (srcResourceId.getBucketName().equals(dstResourceId.getBucketName())) { + // First, move all items except marker items + moveInternal(srcToDstItemNames); + // Finally, move marker items (if any) to mark rename operation success + moveInternal(srcToDstMarkerItemNames); + + if (srcInfo.getItemInfo().isBucket()) { + deleteBucket(Collections.singletonList(srcInfo)); + } else { + // If src is a directory then srcItemInfos does not contain its own name, + // we delete item separately in the list. + deleteObjects(Collections.singletonList(srcInfo)); + } + return; + } + + // TODO: Add support for across bucket moves + throw new UnsupportedOperationException(String.format( + "Moving object from bucket '%s' to '%s' is not supported", + srcResourceId.getBucketName(), + dstResourceId.getBucketName())); + } + + List<FileInfo> listFileInfoForPrefix(URI prefix, ListFileOptions listOptions) + throws IOException { + LOG.trace("listAllFileInfoForPrefix(prefix: {})", prefix); + StorageResourceId prefixId = getPrefixId(prefix); + List<GoogleCloudStorageItemInfo> itemInfos = + gcs.listObjectInfo( + prefixId.getBucketName(), + prefixId.getObjectName(), + updateListObjectOptions(ListObjectOptions.DEFAULT_FLAT_LIST, listOptions)); + List<FileInfo> fileInfos = FileInfo.fromItemInfos(itemInfos); + fileInfos.sort(FILE_INFO_PATH_COMPARATOR); + return fileInfos; + } + + /** Moves items in given map that maps source items to destination items. */ + private void moveInternal(Map<FileInfo, URI> srcToDstItemNames) throws IOException { + if (srcToDstItemNames.isEmpty()) { + return; + } + + Map<StorageResourceId, StorageResourceId> sourceToDestinationObjectsMap = new HashMap<>(); + + // Prepare list of items to move. + for (Map.Entry<FileInfo, URI> srcToDstItemName : srcToDstItemNames.entrySet()) { + StorageResourceId srcResourceId = srcToDstItemName.getKey().getItemInfo().getResourceId(); + + StorageResourceId dstResourceId = + StorageResourceId.fromUriPath(srcToDstItemName.getValue(), true); + sourceToDestinationObjectsMap.put(srcResourceId, dstResourceId); + } + + // Perform move. + gcs.move(sourceToDestinationObjectsMap); + } + + private static ListObjectOptions updateListObjectOptions( + ListObjectOptions listObjectOptions, ListFileOptions listFileOptions) { + return listObjectOptions.builder().setFields(listFileOptions.getFields()).build(); + } + + private List<FileInfo> getFileInfos(List<URI> paths) throws IOException { + List<FileInfo> result = new ArrayList<>(paths.size()); + for (URI path : paths) { + // TODO: Do this concurrently + result.add(getFileInfo(path)); + } + + return result; + } + + private URI getDstUri(FileInfo srcInfo, FileInfo dstInfo, @Nullable FileInfo dstParentInfo) + throws IOException { + URI src = srcInfo.getPath(); + URI dst = dstInfo.getPath(); + + // Throw if src is a file and dst == GCS_ROOT + if (!srcInfo.isDirectory() && dst.equals(GCSROOT)) { + throw new IOException("A file cannot be created in root."); + } + + // Throw if the destination is a file that already exists, and it's not a source file. + if (dstInfo.exists() && !dstInfo.isDirectory() && (srcInfo.isDirectory() || !dst.equals(src))) { + throw new IOException("Cannot overwrite an existing file: " + dst); + } + + // Rename operation cannot be completed if parent of destination does not exist. + if (dstParentInfo != null && !dstParentInfo.exists()) { + throw new IOException( + "Cannot rename because path does not exist: " + dstParentInfo.getPath()); + } + + // Leaf item of the source path. + String srcItemName = getItemName(src); + + // Having taken care of the initial checks, apply the regular rules. + // After applying the rules, we will be left with 2 paths such that: + // -- either both are files or both are directories + // -- src exists and dst leaf does not exist + if (srcInfo.isDirectory()) { + // -- if src is a directory + // -- dst is an existing file => disallowed + // -- dst is a directory => rename the directory. + + // The first case (dst is an existing file) is already checked earlier. + // If the destination path looks like a file, make it look like a + // directory path. This is because users often type 'mv foo bar' + // rather than 'mv foo bar/'. + if (!dstInfo.isDirectory()) { + dst = UriPaths.toDirectory(dst); + } + + // Throw if renaming directory to self - this is forbidden + if (src.equals(dst)) { + throw new IOException("Rename dir to self is forbidden"); + } + + URI dstRelativeToSrc = src.relativize(dst); + // Throw if dst URI relative to src is not equal to dst, + // because this means that src is a parent directory of dst + // and src cannot be "renamed" to its subdirectory + if (!dstRelativeToSrc.equals(dst)) { + throw new IOException("Rename to subdir is forbidden"); + } + + if (dstInfo.exists()) { + dst = + dst.equals(GCSROOT) + ? UriPaths.fromStringPathComponents( + srcItemName, /* objectName= */ null, /* allowEmptyObjectName= */ true) + : UriPaths.toDirectory(dst.resolve(srcItemName)); + } + } else { + // -- src is a file + // -- dst is a file => rename the file. + // -- dst is a directory => similar to the previous case after + // appending src file-name to dst + + if (dstInfo.isDirectory()) { + if (!dstInfo.exists()) { + throw new IOException("Cannot rename because path does not exist: " + dstInfo.getPath()); + } else { + dst = dst.resolve(srcItemName); + } + } + } + + return dst; + } + + @Nullable + static String getItemName(URI path) { + checkNotNull(path, "path can not be null"); + + // There is no leaf item for the root path. + if (path.equals(GCSROOT)) { + return null; + } + + StorageResourceId resourceId = StorageResourceId.fromUriPath(path, true); + + if (resourceId.isBucket()) { + return resourceId.getBucketName(); + } + + String objectName = resourceId.getObjectName(); + int index = + StringPaths.isDirectoryPath(objectName) + ? objectName.lastIndexOf(PATH_DELIMITER, objectName.length() - 2) + : objectName.lastIndexOf(PATH_DELIMITER); + return index < 0 ? objectName : objectName.substring(index + 1); + } } diff --git a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFSInputStream.java b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFSInputStream.java new file mode 100644 index 00000000000..26629fc79b2 --- /dev/null +++ b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFSInputStream.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.gs; + +import static org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkNotNull; +import static org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.SeekableByteChannel; +import javax.annotation.Nonnull; +import org.apache.hadoop.fs.FSExceptionMessages; +import org.apache.hadoop.fs.FSInputStream; +import org.apache.hadoop.fs.FileSystem; + +final class GoogleHadoopFSInputStream extends FSInputStream { + public static final Logger LOG = LoggerFactory.getLogger(GoogleHadoopFSInputStream.class); + + // Used for single-byte reads. + private final byte[] singleReadBuf = new byte[1]; + + // Path of the file to read. + private final URI gcsPath; + // File Info of gcsPath, will be pre-populated in some cases i.e. when Json client is used and + // failFast is disabled. + + // All store IO access goes through this. + private final SeekableByteChannel channel; + // Number of bytes read through this channel. + private long totalBytesRead = 0; + + /** + * Closed bit. Volatile so reads are non-blocking. Updates must be in a synchronized block to + * guarantee an atomic check and set + */ + private volatile boolean closed; + + // Statistics tracker provided by the parent GoogleHadoopFileSystem for recording stats + private final FileSystem.Statistics statistics; + + static GoogleHadoopFSInputStream create( + GoogleHadoopFileSystem ghfs, URI gcsPath, FileSystem.Statistics statistics) + throws IOException { + LOG.trace("create(gcsPath: {})", gcsPath); + GoogleCloudStorageFileSystem gcsFs = ghfs.getGcsFs(); + FileInfo fileInfo = gcsFs.getFileInfoObject(gcsPath); + SeekableByteChannel channel = gcsFs.open(fileInfo, ghfs.getFileSystemConfiguration()); + return new GoogleHadoopFSInputStream(gcsPath, channel, statistics); + } + + private GoogleHadoopFSInputStream( + URI gcsPath, + SeekableByteChannel channel, + FileSystem.Statistics statistics) { + LOG.trace("GoogleHadoopFSInputStream(gcsPath: %s)", gcsPath); + this.gcsPath = gcsPath; + this.channel = channel; + this.statistics = statistics; + } + + @Override + public synchronized int read() throws IOException { + checkNotClosed(); + int numRead = read(singleReadBuf, /* offset= */ 0, /* length= */ 1); + checkState( + numRead == -1 || numRead == 1, + "Read %s bytes using single-byte buffer for path %s ending in position %s", + numRead, + gcsPath, + channel.position()); + return numRead > 0 ? singleReadBuf[0] & 0xff : numRead; + } + + @Override + public synchronized int read(@Nonnull byte[] buf, int offset, int length) throws IOException { + checkNotClosed(); + checkNotNull(buf, "buf must not be null"); + if (offset < 0 || length < 0 || length > buf.length - offset) { + throw new IndexOutOfBoundsException(); + } + + // TODO(user): Wrap this in a while-loop if we ever introduce a non-blocking mode for + // the underlying channel. + int numRead = channel.read(ByteBuffer.wrap(buf, offset, length)); + if (numRead > 0) { + // -1 means we actually read 0 bytes, but requested at least one byte. + totalBytesRead += numRead; + statistics.incrementBytesRead(numRead); + statistics.incrementReadOps(1); + } + return numRead; + } + + @Override + public synchronized void seek(long pos) throws IOException { + checkNotClosed(); + LOG.trace("seek({})", pos); + try { + channel.position(pos); + } catch (IllegalArgumentException e) { + throw new IOException(e); + } + } + + @Override + public synchronized void close() throws IOException { + if (!closed) { + closed = true; + + LOG.trace("close(): {}", gcsPath); + try { + if (channel != null) { + LOG.trace( + "Closing '{}' file with {} total bytes read", gcsPath, totalBytesRead); + channel.close(); + } + } catch (Exception e) { + LOG.warn("Error while closing underneath read channel resources for path: {}", gcsPath, e); + } + } + } + + /** + * Gets the current position within the file being read. + * + * @return The current position within the file being read. + * @throws IOException if an IO error occurs. + */ + @Override + public synchronized long getPos() throws IOException { + checkNotClosed(); + long pos = channel.position(); + LOG.trace("getPos(): {}", pos); + return pos; + } + + /** + * Seeks a different copy of the data. Not supported. + * + * @return true if a new source is found, false otherwise. + */ + @Override + public boolean seekToNewSource(long targetPos) { + LOG.trace("seekToNewSource({}): false", targetPos); + return false; + } + + @Override + public int available() throws IOException { + if (!channel.isOpen()) { + throw new ClosedChannelException(); + } + return super.available(); + } + + /** + * Verify that the input stream is open. Non-blocking; this gives the last state of the volatile + * {@link #closed} field. + * + * @throws IOException if the connection is closed. + */ + private void checkNotClosed() throws IOException { + if (closed) { + throw new IOException(gcsPath + ": " + FSExceptionMessages.STREAM_IS_CLOSED); + } + } +} diff --git a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystem.java b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystem.java index 8831568a356..8bf2f057724 100644 --- a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystem.java +++ b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystem.java @@ -265,9 +265,10 @@ public String getScheme() { } @Override - public FSDataInputStream open(final Path path, final int bufferSize) throws IOException { - LOG.trace("open({})", path); - throw new UnsupportedOperationException(path.toString()); + public FSDataInputStream open(final Path hadoopPath, final int bufferSize) throws IOException { + LOG.trace("open({})", hadoopPath); + URI gcsPath = getGcsPath(hadoopPath); + return new FSDataInputStream(GoogleHadoopFSInputStream.create(this, gcsPath, statistics)); } @Override @@ -327,9 +328,37 @@ public FSDataOutputStream append(final Path path, final int i, final Progressabl } @Override - public boolean rename(final Path path, final Path path1) throws IOException { - LOG.trace("rename({}, {})", path, path1); - throw new UnsupportedOperationException(path.toString()); + public boolean rename(final Path src, final Path dst) throws IOException { + LOG.trace("rename({}, {})", src, dst); + + checkArgument(src != null, "src must not be null"); + checkArgument(dst != null, "dst must not be null"); + + // Even though the underlying GCSFS will also throw an IAE if src is root, since our filesystem + // root happens to equal the global root, we want to explicitly check it here since derived + // classes may not have filesystem roots equal to the global root. + if (this.makeQualified(src).equals(fsRoot)) { + LOG.trace("rename(src: {}, dst: {}): false [src is a root]", src, dst); + return false; + } + + try { + checkOpen(); + + URI srcPath = getGcsPath(src); + URI dstPath = getGcsPath(dst); + getGcsFs().rename(srcPath, dstPath); + + LOG.trace("rename(src: {}, dst: {}): true", src, dst); + } catch (IOException e) { + if (ApiErrorExtractor.INSTANCE.requestFailure(e)) { + throw e; + } + LOG.trace("rename(src: %s, dst: %s): false [failed]", src, dst, e); + return false; + } + + return true; } @Override @@ -468,8 +497,7 @@ public Path getWorkingDirectory() { public boolean mkdirs(final Path hadoopPath, final FsPermission permission) throws IOException { checkArgument(hadoopPath != null, "hadoopPath must not be null"); - LOG.trace( - "mkdirs(hadoopPath: {}, permission: {}): true", hadoopPath, permission); + LOG.trace("mkdirs(hadoopPath: {}, permission: {}): true", hadoopPath, permission); checkOpen(); @@ -582,7 +610,6 @@ public void setWorkingDirectory(final Path hadoopPath) { LOG.trace("setWorkingDirectory(hadoopPath: {}): {}", hadoopPath, workingDirectory); } - private static String getUgiUserName() throws IOException { UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); return ugi.getShortUserName(); diff --git a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystemConfiguration.java b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystemConfiguration.java index a480a72e60b..20831885fe6 100644 --- a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystemConfiguration.java +++ b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystemConfiguration.java @@ -18,6 +18,8 @@ package org.apache.hadoop.fs.gs; +import java.util.regex.Pattern; + import static java.lang.Math.toIntExact; import org.apache.hadoop.conf.Configuration; @@ -26,6 +28,8 @@ * This class provides a configuration for the {@link GoogleHadoopFileSystem} implementations. */ class GoogleHadoopFileSystemConfiguration { + private static final Long GCS_INPUT_STREAM_INPLACE_SEEK_LIMIT_DEFAULT = 8 * 1024 * 1024L; + /** * Configuration key for default block size of a file. * @@ -55,23 +59,79 @@ class GoogleHadoopFileSystemConfiguration { static final HadoopConfigurationProperty<Long> GCS_OUTPUT_STREAM_BUFFER_SIZE = new HadoopConfigurationProperty<>("fs.gs.outputstream.buffer.size", 8L * 1024 * 1024); + + /** + * If forward seeks are within this many bytes of the current position, seeks are performed by + * reading and discarding bytes in-place rather than opening a new underlying stream. + */ + public static final HadoopConfigurationProperty<Long> GCS_INPUT_STREAM_INPLACE_SEEK_LIMIT = + new HadoopConfigurationProperty<>( + "fs.gs.inputstream.inplace.seek.limit", + GCS_INPUT_STREAM_INPLACE_SEEK_LIMIT_DEFAULT); + + /** Tunes reading objects behavior to optimize HTTP GET requests for various use cases. */ + public static final HadoopConfigurationProperty<Fadvise> GCS_INPUT_STREAM_FADVISE = + new HadoopConfigurationProperty<>("fs.gs.inputstream.fadvise", Fadvise.RANDOM); + + /** + * If false, reading a file with GZIP content encoding (HTTP header "Content-Encoding: gzip") will + * result in failure (IOException is thrown). + */ + public static final HadoopConfigurationProperty<Boolean> + GCS_INPUT_STREAM_SUPPORT_GZIP_ENCODING_ENABLE = + new HadoopConfigurationProperty<>( + "fs.gs.inputstream.support.gzip.encoding.enable", + false); + + /** + * Minimum size in bytes of the HTTP Range header set in GCS request when opening new stream to + * read an object. + */ + public static final HadoopConfigurationProperty<Long> GCS_INPUT_STREAM_MIN_RANGE_REQUEST_SIZE = + new HadoopConfigurationProperty<>( + "fs.gs.inputstream.min.range.request.size", + 2 * 1024 * 1024L); + + /** + * Configuration key for number of request to track for adapting the access pattern i.e. fadvise: + * AUTO & AUTO_RANDOM. + */ + public static final HadoopConfigurationProperty<Integer> GCS_FADVISE_REQUEST_TRACK_COUNT = + new HadoopConfigurationProperty<>("fs.gs.fadvise.request.track.count", 3); + + /** + * Configuration key for specifying max number of bytes rewritten in a single rewrite request when + * fs.gs.copy.with.rewrite.enable is set to 'true'. + */ + public static final HadoopConfigurationProperty<Long> GCS_REWRITE_MAX_CHUNK_SIZE = + new HadoopConfigurationProperty<>( + "fs.gs.rewrite.max.chunk.size", + 512 * 1024 * 1024L); + + /** Configuration key for marker file pattern. Default value: none */ + public static final HadoopConfigurationProperty<String> GCS_MARKER_FILE_PATTERN = + new HadoopConfigurationProperty<>("fs.gs.marker.file.pattern"); + private final String workingDirectory; private final String projectId; + private final Configuration config; + private Pattern fileMarkerFilePattern; - public int getOutStreamBufferSize() { + int getOutStreamBufferSize() { return outStreamBufferSize; } private final int outStreamBufferSize; - GoogleHadoopFileSystemConfiguration(Configuration config) { - this.workingDirectory = GCS_WORKING_DIRECTORY.get(config, config::get); + GoogleHadoopFileSystemConfiguration(Configuration conf) { + this.workingDirectory = GCS_WORKING_DIRECTORY.get(conf, conf::get); this.outStreamBufferSize = - toIntExact(GCS_OUTPUT_STREAM_BUFFER_SIZE.get(config, config::getLongBytes)); - this.projectId = GCS_PROJECT_ID.get(config, config::get); + toIntExact(GCS_OUTPUT_STREAM_BUFFER_SIZE.get(conf, conf::getLongBytes)); + this.projectId = GCS_PROJECT_ID.get(conf, conf::get); + this.config = conf; } - public String getWorkingDirectory() { + String getWorkingDirectory() { return this.workingDirectory; } @@ -79,7 +139,53 @@ String getProjectId() { return this.projectId; } - public long getMaxListItemsPerCall() { + long getMaxListItemsPerCall() { return 5000L; //TODO: Make this configurable } + + Fadvise getFadvise() { + return GCS_INPUT_STREAM_FADVISE.get(config, config::getEnum); + } + + long getInplaceSeekLimit() { + return GCS_INPUT_STREAM_INPLACE_SEEK_LIMIT.get(config, config::getLongBytes); + } + + public int getFadviseRequestTrackCount() { + return GCS_FADVISE_REQUEST_TRACK_COUNT.get(config, config::getInt); + } + + public boolean isGzipEncodingSupportEnabled() { + return GCS_INPUT_STREAM_SUPPORT_GZIP_ENCODING_ENABLE.get(config, config::getBoolean); + } + + public long getMinRangeRequestSize() { + return GCS_INPUT_STREAM_MIN_RANGE_REQUEST_SIZE.get(config, config::getLongBytes); + } + + public long getBlockSize() { + return BLOCK_SIZE.get(config, config::getLong); + } + + public boolean isReadExactRequestedBytesEnabled() { + return false; //TODO: Remove this option? + } + + public long getMaxRewriteChunkSize() { + return GCS_REWRITE_MAX_CHUNK_SIZE.get(config, config::getLong); + } + + public Pattern getMarkerFilePattern() { + String pattern = GCS_MARKER_FILE_PATTERN.get(config, config::get); + if (pattern == null) { + return null; + } + + if (fileMarkerFilePattern == null) { + // Caching the pattern since compile step can be expensive + fileMarkerFilePattern = Pattern.compile("^(.+/)?" + pattern + "$"); + } + + return fileMarkerFilePattern; + } } diff --git a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/Gs.java b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/Gs.java new file mode 100644 index 00000000000..e0b5ec4acaf --- /dev/null +++ b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/Gs.java @@ -0,0 +1,64 @@ +package org.apache.hadoop.fs.gs; + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.DelegateToFileSystem; + +/** + * GCS implementation of AbstractFileSystem. + * This impl delegates to the GoogleHadoopFileSystem + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class Gs extends DelegateToFileSystem { + public Gs(URI theUri, Configuration conf) throws IOException, URISyntaxException { + super(theUri, new GoogleHadoopFileSystem(), conf, + theUri.getScheme().isEmpty() ? Constants.SCHEME : theUri.getScheme(), false); + } + + @Override + public int getUriDefaultPort() { + return super.getUriDefaultPort(); + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("gs{"); + sb.append("URI =").append(fsImpl.getUri()); + sb.append("; fsImpl=").append(fsImpl); + sb.append('}'); + return sb.toString(); + } + + /** + * Close the file system; the FileContext API doesn't have an explicit close. + */ + @Override + protected void finalize() throws Throwable { + fsImpl.close(); + super.finalize(); + } +} \ No newline at end of file diff --git a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ListFileOptions.java b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ListFileOptions.java index 2bc74c6fc21..6ef7b7641f5 100644 --- a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ListFileOptions.java +++ b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ListFileOptions.java @@ -22,6 +22,10 @@ final class ListFileOptions { static final ListFileOptions OBJECTFIELDS = new ListFileOptions("bucket,name,size,updated"); + + static final ListFileOptions DELETE_RENAME_LIST_OPTIONS = + new ListFileOptions("bucket,name,generation"); + private final String fields; private ListFileOptions(@Nonnull String fields) { diff --git a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ListFileOptions.java b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractOpen.java similarity index 64% copy from hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ListFileOptions.java copy to hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractOpen.java index 2bc74c6fc21..c3d7c6ddac7 100644 --- a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ListFileOptions.java +++ b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractOpen.java @@ -16,19 +16,16 @@ * limitations under the License. */ -package org.apache.hadoop.fs.gs; +package org.apache.hadoop.fs.gs.contract; -import javax.annotation.Nonnull; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractOpenTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; -final class ListFileOptions { - static final ListFileOptions OBJECTFIELDS = new ListFileOptions("bucket,name,size,updated"); - private final String fields; - - private ListFileOptions(@Nonnull String fields) { - this.fields = fields; - } - - String getFields() { - return fields; +/** GCS contract tests covering file open. */ +public class ITestGoogleContractOpen extends AbstractContractOpenTest { + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new GoogleContract(conf); } } diff --git a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractRename.java b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractRename.java new file mode 100644 index 00000000000..5d9459cac19 --- /dev/null +++ b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractRename.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.gs.contract; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractRenameTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.apache.hadoop.fs.contract.ContractTestUtils; + +/** GCS contract tests covering file rename. */ +public class ITestGoogleContractRename extends AbstractContractRenameTest { + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new GoogleContract(conf); + } + + @Override + public void testRenameWithNonEmptySubDir() { + // TODO: Enable this + ContractTestUtils.skip("Skipping the test. This will be enabled in a subsequent change"); + } + + @Override + public void testRenameNonexistentFile() { + // TODO: Enable this + ContractTestUtils.skip("Skipping the test. This will be enabled in a subsequent change"); + } +} diff --git a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ListFileOptions.java b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractSeek.java similarity index 67% copy from hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ListFileOptions.java copy to hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractSeek.java index 2bc74c6fc21..651487bd641 100644 --- a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ListFileOptions.java +++ b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractSeek.java @@ -16,19 +16,15 @@ * limitations under the License. */ -package org.apache.hadoop.fs.gs; +package org.apache.hadoop.fs.gs.contract; -import javax.annotation.Nonnull; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractSeekTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; -final class ListFileOptions { - static final ListFileOptions OBJECTFIELDS = new ListFileOptions("bucket,name,size,updated"); - private final String fields; - - private ListFileOptions(@Nonnull String fields) { - this.fields = fields; - } - - String getFields() { - return fields; +public class ITestGoogleContractSeek extends AbstractContractSeekTest { + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new GoogleContract(conf); } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org