shunping commented on code in PR #36876:
URL: https://github.com/apache/beam/pull/36876#discussion_r2553133388
##########
sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java:
##########
@@ -276,1177 +115,391 @@ public static boolean isWildcard(GcsPath spec) {
@Nullable Integer uploadBufferSizeBytes,
@Nullable Integer rewriteDataOpBatchLimit,
GcsCountersOptions gcsCountersOptions,
- GoogleCloudStorageReadOptions gcsReadOptions) {
- this.storageClient = storageClient;
- this.httpRequestInitializer = httpRequestInitializer;
- this.uploadBufferSizeBytes = uploadBufferSizeBytes;
- this.executorService = executorService;
- this.credentials = credentials;
- this.maxBytesRewrittenPerCall = null;
- this.numRewriteTokensUsed = null;
- googleCloudStorageOptions =
- GoogleCloudStorageOptions.builder()
- .setAppName("Beam")
- .setReadChannelOptions(gcsReadOptions)
- .setGrpcEnabled(shouldUseGrpc)
- .build();
- googleCloudStorage =
- createGoogleCloudStorage(googleCloudStorageOptions, storageClient,
credentials);
- this.batchRequestSupplier =
- () -> {
- // Capture reference to this so that the most recent storageClient
and initializer
- // are used.
- GcsUtil util = this;
- return new BatchInterface() {
- final BatchRequest batch =
util.storageClient.batch(util.httpRequestInitializer);
-
- @Override
- public <T> void queue(
- AbstractGoogleJsonClientRequest<T> request,
JsonBatchCallback<T> cb)
- throws IOException {
- request.queue(batch, cb);
- }
-
- @Override
- public void execute() throws IOException {
- batch.execute();
- }
-
- @Override
- public int size() {
- return batch.size();
- }
- };
- };
- this.rewriteDataOpBatchLimit =
- rewriteDataOpBatchLimit == null ? MAX_REQUESTS_PER_COPY_BATCH :
rewriteDataOpBatchLimit;
- this.gcsCountersOptions = gcsCountersOptions;
+ GcsOptions gcsOptions) {
+ this.delegate =
+ new GcsUtilLegacy(
+ storageClient,
+ httpRequestInitializer,
+ executorService,
+ shouldUseGrpc,
+ credentials,
+ uploadBufferSizeBytes,
+ rewriteDataOpBatchLimit,
+ gcsCountersOptions.delegate,
+ gcsOptions);
}
- // Use this only for testing purposes.
protected void setStorageClient(Storage storageClient) {
- this.storageClient = storageClient;
+ delegate.setStorageClient(storageClient);
}
- // Use this only for testing purposes.
protected void setBatchRequestSupplier(Supplier<BatchInterface> supplier) {
- this.batchRequestSupplier = supplier;
+ delegate.setBatchRequestSupplier(() -> new
LegacyBatchAdapter(supplier.get()));
}
- /**
- * Expands a pattern into matched paths. The pattern path may contain globs,
which are expanded in
- * the result. For patterns that only match a single object, we ensure that
the object exists.
- */
public List<GcsPath> expand(GcsPath gcsPattern) throws IOException {
- Pattern p = null;
- String prefix = null;
- if (isWildcard(gcsPattern)) {
- // Part before the first wildcard character.
- prefix = getNonWildcardPrefix(gcsPattern.getObject());
- p = Pattern.compile(wildcardToRegexp(gcsPattern.getObject()));
- } else {
- // Not a wildcard.
- try {
- // Use a get request to fetch the metadata of the object, and ignore
the return value.
- // The request has strong global consistency.
- getObject(gcsPattern);
- return ImmutableList.of(gcsPattern);
- } catch (FileNotFoundException e) {
- // If the path was not found, return an empty list.
- return ImmutableList.of();
- }
- }
-
- LOG.debug(
- "matching files in bucket {}, prefix {} against pattern {}",
- gcsPattern.getBucket(),
- prefix,
- p.toString());
-
- String pageToken = null;
- List<GcsPath> results = new ArrayList<>();
- do {
- Objects objects = listObjects(gcsPattern.getBucket(), prefix, pageToken);
- if (objects.getItems() == null) {
- break;
- }
-
- // Filter objects based on the regex.
- for (StorageObject o : objects.getItems()) {
- String name = o.getName();
- // Skip directories, which end with a slash.
- if (p.matcher(name).matches() && !name.endsWith("/")) {
- LOG.debug("Matched object: {}", name);
- results.add(GcsPath.fromObject(o));
- }
- }
- pageToken = objects.getNextPageToken();
- } while (pageToken != null);
-
- return results;
+ return delegate.expand(gcsPattern);
}
@VisibleForTesting
@Nullable
Integer getUploadBufferSizeBytes() {
- return uploadBufferSizeBytes;
- }
-
- private static BackOff createBackOff() {
- return BackOffAdapter.toGcpBackOff(BACKOFF_FACTORY.backoff());
+ return delegate.getUploadBufferSizeBytes();
}
- /**
- * Returns the file size from GCS or throws {@link FileNotFoundException} if
the resource does not
- * exist.
- */
public long fileSize(GcsPath path) throws IOException {
- return getObject(path).getSize().longValue();
+ return delegate.fileSize(path);
}
- /** Returns the {@link StorageObject} for the given {@link GcsPath}. */
public StorageObject getObject(GcsPath gcsPath) throws IOException {
- return getObject(gcsPath, createBackOff(), Sleeper.DEFAULT);
+ return delegate.getObject(gcsPath);
}
@VisibleForTesting
StorageObject getObject(GcsPath gcsPath, BackOff backoff, Sleeper sleeper)
throws IOException {
- Storage.Objects.Get getObject =
- storageClient.objects().get(gcsPath.getBucket(), gcsPath.getObject());
- try {
- return ResilientOperation.retry(
- getObject::execute, backoff, RetryDeterminer.SOCKET_ERRORS,
IOException.class, sleeper);
- } catch (IOException | InterruptedException e) {
- if (e instanceof InterruptedException) {
- Thread.currentThread().interrupt();
- }
- if (e instanceof IOException &&
errorExtractor.itemNotFound((IOException) e)) {
- throw new FileNotFoundException(gcsPath.toString());
- }
- throw new IOException(
- String.format("Unable to get the file object for path %s.",
gcsPath), e);
- }
+ return delegate.getObject(gcsPath, backoff, sleeper);
}
- /**
- * Returns {@link StorageObjectOrIOException StorageObjectOrIOExceptions}
for the given {@link
- * GcsPath GcsPaths}.
- */
public List<StorageObjectOrIOException> getObjects(List<GcsPath> gcsPaths)
throws IOException {
- if (gcsPaths.isEmpty()) {
- return ImmutableList.of();
- } else if (gcsPaths.size() == 1) {
- GcsPath path = gcsPaths.get(0);
- try {
- StorageObject object = getObject(path);
- return ImmutableList.of(StorageObjectOrIOException.create(object));
- } catch (IOException e) {
- return ImmutableList.of(StorageObjectOrIOException.create(e));
- } catch (Exception e) {
- IOException ioException =
- new IOException(String.format("Error trying to get %s: %s", path,
e));
- return
ImmutableList.of(StorageObjectOrIOException.create(ioException));
- }
- }
-
- List<StorageObjectOrIOException[]> results = new ArrayList<>();
- executeBatches(makeGetBatches(gcsPaths, results));
- ImmutableList.Builder<StorageObjectOrIOException> ret =
ImmutableList.builder();
- for (StorageObjectOrIOException[] result : results) {
- ret.add(result[0]);
- }
- return ret.build();
+ List<GcsUtilLegacy.StorageObjectOrIOException> legacy =
delegate.getObjects(gcsPaths);
+ return legacy.stream()
+ .map(StorageObjectOrIOException::fromLegacy)
+ .collect(java.util.stream.Collectors.toList());
}
public Objects listObjects(String bucket, String prefix, @Nullable String
pageToken)
throws IOException {
- return listObjects(bucket, prefix, pageToken, null);
+ return delegate.listObjects(bucket, prefix, pageToken);
}
- /**
- * Lists {@link Objects} given the {@code bucket}, {@code prefix}, {@code
pageToken}.
- *
- * <p>For more details, see
https://cloud.google.com/storage/docs/json_api/v1/objects/list.
- */
public Objects listObjects(
String bucket, String prefix, @Nullable String pageToken, @Nullable
String delimiter)
throws IOException {
- // List all objects that start with the prefix (including objects in
sub-directories).
- Storage.Objects.List listObject = storageClient.objects().list(bucket);
- listObject.setMaxResults(MAX_LIST_ITEMS_PER_CALL);
- listObject.setPrefix(prefix);
- listObject.setDelimiter(delimiter);
-
- if (pageToken != null) {
- listObject.setPageToken(pageToken);
- }
-
- try {
- return ResilientOperation.retry(
- listObject::execute, createBackOff(), RetryDeterminer.SOCKET_ERRORS,
IOException.class);
- } catch (Exception e) {
- throw new IOException(
- String.format("Unable to match files in bucket %s, prefix %s.",
bucket, prefix), e);
- }
+ return delegate.listObjects(bucket, prefix, pageToken, delimiter);
}
- /**
- * Returns the file size from GCS or throws {@link FileNotFoundException} if
the resource does not
- * exist.
- */
@VisibleForTesting
List<Long> fileSizes(List<GcsPath> paths) throws IOException {
- List<StorageObjectOrIOException> results = getObjects(paths);
-
- ImmutableList.Builder<Long> ret = ImmutableList.builder();
- for (StorageObjectOrIOException result : results) {
- ret.add(toFileSize(result));
- }
- return ret.build();
- }
-
- private Long toFileSize(StorageObjectOrIOException
storageObjectOrIOException)
- throws IOException {
- if (storageObjectOrIOException.ioException() != null) {
- throw storageObjectOrIOException.ioException();
- } else {
- return storageObjectOrIOException.storageObject().getSize().longValue();
- }
- }
-
- @VisibleForTesting
- void setCloudStorageImpl(GoogleCloudStorage g) {
- googleCloudStorage = g;
- }
-
- @VisibleForTesting
- void setCloudStorageImpl(GoogleCloudStorageOptions g) {
- googleCloudStorageOptions = g;
- }
-
- /**
- * Create an integer consumer that updates the counter identified by a
prefix and a bucket name.
- */
- private static Consumer<Integer> createCounterConsumer(String
counterNamePrefix, String bucket) {
- return Metrics.counter(GcsUtil.class, String.format("%s_%s",
counterNamePrefix, bucket))::inc;
+ return delegate.fileSizes(paths);
}
- private WritableByteChannel wrapInCounting(
- WritableByteChannel writableByteChannel, String bucket) {
- if (writableByteChannel instanceof CountingWritableByteChannel) {
- return writableByteChannel;
- }
- return Optional.ofNullable(gcsCountersOptions.getWriteCounterPrefix())
- .<WritableByteChannel>map(
- prefix -> {
- LOG.debug(
- "wrapping writable byte channel using counter name prefix {}
and bucket {}",
- prefix,
- bucket);
- return new CountingWritableByteChannel(
- writableByteChannel, createCounterConsumer(prefix, bucket));
- })
- .orElse(writableByteChannel);
- }
-
- private SeekableByteChannel wrapInCounting(
- SeekableByteChannel seekableByteChannel, String bucket) {
- if (seekableByteChannel instanceof CountingSeekableByteChannel
- || !gcsCountersOptions.hasAnyPrefix()) {
- return seekableByteChannel;
- }
-
- return new CountingSeekableByteChannel(
- seekableByteChannel,
- Optional.ofNullable(gcsCountersOptions.getReadCounterPrefix())
- .map(
- prefix -> {
- LOG.debug(
- "wrapping seekable byte channel with \"bytes read\"
counter name prefix {}"
- + " and bucket {}",
- prefix,
- bucket);
- return createCounterConsumer(prefix, bucket);
- })
- .orElse(null),
- Optional.ofNullable(gcsCountersOptions.getWriteCounterPrefix())
- .map(
- prefix -> {
- LOG.debug(
- "wrapping seekable byte channel with \"bytes written\"
counter name prefix {}"
- + " and bucket {}",
- prefix,
- bucket);
- return createCounterConsumer(prefix, bucket);
- })
- .orElse(null));
- }
-
- /**
- * Opens an object in GCS.
- *
- * <p>Returns a SeekableByteChannel that provides access to data in the
bucket.
- *
- * @param path the GCS filename to read from
- * @return a SeekableByteChannel that can read the object data
- */
public SeekableByteChannel open(GcsPath path) throws IOException {
- String bucket = path.getBucket();
- SeekableByteChannel channel =
- googleCloudStorage.open(
- new StorageResourceId(path.getBucket(), path.getObject()),
- this.googleCloudStorageOptions.getReadChannelOptions());
- return wrapInCounting(channel, bucket);
- }
-
- /**
- * Opens an object in GCS.
- *
- * <p>Returns a SeekableByteChannel that provides access to data in the
bucket.
- *
- * @param path the GCS filename to read from
- * @param readOptions Fine-grained options for behaviors of retries,
buffering, etc.
- * @return a SeekableByteChannel that can read the object data
- */
- @VisibleForTesting
- SeekableByteChannel open(GcsPath path, GoogleCloudStorageReadOptions
readOptions)
- throws IOException {
- HashMap<String, String> baseLabels = new HashMap<>();
- baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
- baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "Storage");
- baseLabels.put(MonitoringInfoConstants.Labels.METHOD, "GcsGet");
- baseLabels.put(
- MonitoringInfoConstants.Labels.RESOURCE,
- GcpResourceIdentifiers.cloudStorageBucket(path.getBucket()));
- baseLabels.put(
- MonitoringInfoConstants.Labels.GCS_PROJECT_ID,
- String.valueOf(googleCloudStorageOptions.getProjectId()));
- baseLabels.put(MonitoringInfoConstants.Labels.GCS_BUCKET,
path.getBucket());
-
- ServiceCallMetric serviceCallMetric =
- new ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT,
baseLabels);
- try {
- SeekableByteChannel channel =
- googleCloudStorage.open(
- new StorageResourceId(path.getBucket(), path.getObject()),
readOptions);
- serviceCallMetric.call("ok");
- return wrapInCounting(channel, path.getBucket());
- } catch (IOException e) {
- if (e.getCause() instanceof GoogleJsonResponseException) {
- serviceCallMetric.call(((GoogleJsonResponseException)
e.getCause()).getDetails().getCode());
- }
- throw e;
- }
+ return delegate.open(path);
}
/** @deprecated Use {@link #create(GcsPath, CreateOptions)} instead. */
@Deprecated
public WritableByteChannel create(GcsPath path, String type) throws
IOException {
- CreateOptions.Builder builder =
CreateOptions.builder().setContentType(type);
- return create(path, builder.build());
+ return delegate.create(path, type);
}
/** @deprecated Use {@link #create(GcsPath, CreateOptions)} instead. */
@Deprecated
public WritableByteChannel create(GcsPath path, String type, Integer
uploadBufferSizeBytes)
throws IOException {
- CreateOptions.Builder builder =
- CreateOptions.builder()
- .setContentType(type)
- .setUploadBufferSizeBytes(uploadBufferSizeBytes);
- return create(path, builder.build());
+ return delegate.create(path, type, uploadBufferSizeBytes);
}
- @AutoValue
- public abstract static class CreateOptions {
- /**
- * If true, the created file is expected to not exist. Instead of checking
for file presence
- * before writing a write exception may occur if the file does exist.
- */
- public abstract boolean getExpectFileToNotExist();
+ public static class CreateOptions {
+ final GcsUtilLegacy.CreateOptions delegate;
+
+ private CreateOptions(GcsUtilLegacy.CreateOptions delegate) {
+ this.delegate = delegate;
+ }
+
+ public boolean getExpectFileToNotExist() {
+ return delegate.getExpectFileToNotExist();
+ }
- /**
- * If non-null, the upload buffer size to be used. If null, the buffer
size corresponds to {code
- * GCSUtil.getUploadBufferSizeBytes}
- */
- public abstract @Nullable Integer getUploadBufferSizeBytes();
+ public @Nullable Integer getUploadBufferSizeBytes() {
+ return delegate.getUploadBufferSizeBytes();
+ }
- /** The content type for the created file, eg "text/plain". */
- public abstract @Nullable String getContentType();
+ public @Nullable String getContentType() {
+ return delegate.getContentType();
+ }
public static Builder builder() {
- return new
AutoValue_GcsUtil_CreateOptions.Builder().setExpectFileToNotExist(false);
+ return new Builder(GcsUtilLegacy.CreateOptions.builder());
}
- @AutoValue.Builder
- public abstract static class Builder {
- public abstract Builder setContentType(String value);
+ public static class Builder {
+ private final GcsUtilLegacy.CreateOptions.Builder delegateBuilder;
- public abstract Builder setUploadBufferSizeBytes(int value);
+ private Builder(GcsUtilLegacy.CreateOptions.Builder delegateBuilder) {
+ this.delegateBuilder = delegateBuilder;
+ }
- public abstract Builder setExpectFileToNotExist(boolean value);
+ public Builder setContentType(String value) {
+ delegateBuilder.setContentType(value);
+ return this;
+ }
- public abstract CreateOptions build();
- }
- }
+ public Builder setUploadBufferSizeBytes(int value) {
+ delegateBuilder.setUploadBufferSizeBytes(value);
+ return this;
+ }
- /**
- * Creates an object in GCS and prepares for uploading its contents.
- *
- * @param path the GCS file to write to
- * @param options to be used for creating and configuring file upload
- * @return a WritableByteChannel that can be used to write data to the
object.
- */
- public WritableByteChannel create(GcsPath path, CreateOptions options)
throws IOException {
- AsyncWriteChannelOptions wcOptions =
googleCloudStorageOptions.getWriteChannelOptions();
- @Nullable
- Integer uploadBufferSizeBytes =
- options.getUploadBufferSizeBytes() != null
- ? options.getUploadBufferSizeBytes()
- : getUploadBufferSizeBytes();
- if (uploadBufferSizeBytes != null) {
- wcOptions =
wcOptions.toBuilder().setUploadChunkSize(uploadBufferSizeBytes).build();
- }
- GoogleCloudStorageOptions newGoogleCloudStorageOptions =
-
googleCloudStorageOptions.toBuilder().setWriteChannelOptions(wcOptions).build();
- GoogleCloudStorage gcpStorage =
- createGoogleCloudStorage(
- newGoogleCloudStorageOptions, this.storageClient,
this.credentials);
- StorageResourceId resourceId =
- new StorageResourceId(
- path.getBucket(),
- path.getObject(),
- // If we expect the file not to exist, we set a generation id of
0. This avoids a read
- // to identify the object exists already and should be overwritten.
- // See {@link GoogleCloudStorage#create(StorageResourceId,
GoogleCloudStorageOptions)}
- options.getExpectFileToNotExist() ? 0L :
StorageResourceId.UNKNOWN_GENERATION_ID);
- CreateObjectOptions.Builder createBuilder =
- CreateObjectOptions.builder().setOverwriteExisting(true);
- if (options.getContentType() != null) {
- createBuilder = createBuilder.setContentType(options.getContentType());
- }
+ public Builder setExpectFileToNotExist(boolean value) {
+ delegateBuilder.setExpectFileToNotExist(value);
+ return this;
+ }
- HashMap<String, String> baseLabels = new HashMap<>();
- baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
- baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "Storage");
- baseLabels.put(MonitoringInfoConstants.Labels.METHOD, "GcsInsert");
- baseLabels.put(
- MonitoringInfoConstants.Labels.RESOURCE,
- GcpResourceIdentifiers.cloudStorageBucket(path.getBucket()));
- baseLabels.put(
- MonitoringInfoConstants.Labels.GCS_PROJECT_ID,
- String.valueOf(googleCloudStorageOptions.getProjectId()));
- baseLabels.put(MonitoringInfoConstants.Labels.GCS_BUCKET,
path.getBucket());
-
- ServiceCallMetric serviceCallMetric =
- new ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT,
baseLabels);
- try {
- WritableByteChannel channel = gcpStorage.create(resourceId,
createBuilder.build());
- serviceCallMetric.call("ok");
- return wrapInCounting(channel, path.getBucket());
- } catch (IOException e) {
- if (e.getCause() instanceof GoogleJsonResponseException) {
- serviceCallMetric.call(((GoogleJsonResponseException)
e.getCause()).getDetails().getCode());
+ public CreateOptions build() {
+ return new CreateOptions(delegateBuilder.build());
}
- throw e;
}
}
- GoogleCloudStorage createGoogleCloudStorage(
Review Comment:
This function is also removed from `GcsUtil.java`, but will remain in
GcsUtilLegacy.java.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]