shunping commented on code in PR #36876:
URL: https://github.com/apache/beam/pull/36876#discussion_r2553131742
##########
sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java:
##########
@@ -276,1177 +115,391 @@ public static boolean isWildcard(GcsPath spec) {
@Nullable Integer uploadBufferSizeBytes,
@Nullable Integer rewriteDataOpBatchLimit,
GcsCountersOptions gcsCountersOptions,
- GoogleCloudStorageReadOptions gcsReadOptions) {
- this.storageClient = storageClient;
- this.httpRequestInitializer = httpRequestInitializer;
- this.uploadBufferSizeBytes = uploadBufferSizeBytes;
- this.executorService = executorService;
- this.credentials = credentials;
- this.maxBytesRewrittenPerCall = null;
- this.numRewriteTokensUsed = null;
- googleCloudStorageOptions =
- GoogleCloudStorageOptions.builder()
- .setAppName("Beam")
- .setReadChannelOptions(gcsReadOptions)
- .setGrpcEnabled(shouldUseGrpc)
- .build();
- googleCloudStorage =
- createGoogleCloudStorage(googleCloudStorageOptions, storageClient,
credentials);
- this.batchRequestSupplier =
- () -> {
- // Capture reference to this so that the most recent storageClient
and initializer
- // are used.
- GcsUtil util = this;
- return new BatchInterface() {
- final BatchRequest batch =
util.storageClient.batch(util.httpRequestInitializer);
-
- @Override
- public <T> void queue(
- AbstractGoogleJsonClientRequest<T> request,
JsonBatchCallback<T> cb)
- throws IOException {
- request.queue(batch, cb);
- }
-
- @Override
- public void execute() throws IOException {
- batch.execute();
- }
-
- @Override
- public int size() {
- return batch.size();
- }
- };
- };
- this.rewriteDataOpBatchLimit =
- rewriteDataOpBatchLimit == null ? MAX_REQUESTS_PER_COPY_BATCH :
rewriteDataOpBatchLimit;
- this.gcsCountersOptions = gcsCountersOptions;
+ GcsOptions gcsOptions) {
+ this.delegate =
+ new GcsUtilLegacy(
+ storageClient,
+ httpRequestInitializer,
+ executorService,
+ shouldUseGrpc,
+ credentials,
+ uploadBufferSizeBytes,
+ rewriteDataOpBatchLimit,
+ gcsCountersOptions.delegate,
+ gcsOptions);
}
- // Use this only for testing purposes.
protected void setStorageClient(Storage storageClient) {
- this.storageClient = storageClient;
+ delegate.setStorageClient(storageClient);
}
- // Use this only for testing purposes.
protected void setBatchRequestSupplier(Supplier<BatchInterface> supplier) {
- this.batchRequestSupplier = supplier;
+ delegate.setBatchRequestSupplier(() -> new
LegacyBatchAdapter(supplier.get()));
}
- /**
- * Expands a pattern into matched paths. The pattern path may contain globs,
which are expanded in
- * the result. For patterns that only match a single object, we ensure that
the object exists.
- */
public List<GcsPath> expand(GcsPath gcsPattern) throws IOException {
- Pattern p = null;
- String prefix = null;
- if (isWildcard(gcsPattern)) {
- // Part before the first wildcard character.
- prefix = getNonWildcardPrefix(gcsPattern.getObject());
- p = Pattern.compile(wildcardToRegexp(gcsPattern.getObject()));
- } else {
- // Not a wildcard.
- try {
- // Use a get request to fetch the metadata of the object, and ignore
the return value.
- // The request has strong global consistency.
- getObject(gcsPattern);
- return ImmutableList.of(gcsPattern);
- } catch (FileNotFoundException e) {
- // If the path was not found, return an empty list.
- return ImmutableList.of();
- }
- }
-
- LOG.debug(
- "matching files in bucket {}, prefix {} against pattern {}",
- gcsPattern.getBucket(),
- prefix,
- p.toString());
-
- String pageToken = null;
- List<GcsPath> results = new ArrayList<>();
- do {
- Objects objects = listObjects(gcsPattern.getBucket(), prefix, pageToken);
- if (objects.getItems() == null) {
- break;
- }
-
- // Filter objects based on the regex.
- for (StorageObject o : objects.getItems()) {
- String name = o.getName();
- // Skip directories, which end with a slash.
- if (p.matcher(name).matches() && !name.endsWith("/")) {
- LOG.debug("Matched object: {}", name);
- results.add(GcsPath.fromObject(o));
- }
- }
- pageToken = objects.getNextPageToken();
- } while (pageToken != null);
-
- return results;
+ return delegate.expand(gcsPattern);
}
@VisibleForTesting
@Nullable
Integer getUploadBufferSizeBytes() {
- return uploadBufferSizeBytes;
- }
-
- private static BackOff createBackOff() {
- return BackOffAdapter.toGcpBackOff(BACKOFF_FACTORY.backoff());
+ return delegate.getUploadBufferSizeBytes();
}
- /**
- * Returns the file size from GCS or throws {@link FileNotFoundException} if
the resource does not
- * exist.
- */
public long fileSize(GcsPath path) throws IOException {
- return getObject(path).getSize().longValue();
+ return delegate.fileSize(path);
}
- /** Returns the {@link StorageObject} for the given {@link GcsPath}. */
public StorageObject getObject(GcsPath gcsPath) throws IOException {
- return getObject(gcsPath, createBackOff(), Sleeper.DEFAULT);
+ return delegate.getObject(gcsPath);
}
@VisibleForTesting
StorageObject getObject(GcsPath gcsPath, BackOff backoff, Sleeper sleeper)
throws IOException {
- Storage.Objects.Get getObject =
- storageClient.objects().get(gcsPath.getBucket(), gcsPath.getObject());
- try {
- return ResilientOperation.retry(
- getObject::execute, backoff, RetryDeterminer.SOCKET_ERRORS,
IOException.class, sleeper);
- } catch (IOException | InterruptedException e) {
- if (e instanceof InterruptedException) {
- Thread.currentThread().interrupt();
- }
- if (e instanceof IOException &&
errorExtractor.itemNotFound((IOException) e)) {
- throw new FileNotFoundException(gcsPath.toString());
- }
- throw new IOException(
- String.format("Unable to get the file object for path %s.",
gcsPath), e);
- }
+ return delegate.getObject(gcsPath, backoff, sleeper);
}
- /**
- * Returns {@link StorageObjectOrIOException StorageObjectOrIOExceptions}
for the given {@link
- * GcsPath GcsPaths}.
- */
public List<StorageObjectOrIOException> getObjects(List<GcsPath> gcsPaths)
throws IOException {
- if (gcsPaths.isEmpty()) {
- return ImmutableList.of();
- } else if (gcsPaths.size() == 1) {
- GcsPath path = gcsPaths.get(0);
- try {
- StorageObject object = getObject(path);
- return ImmutableList.of(StorageObjectOrIOException.create(object));
- } catch (IOException e) {
- return ImmutableList.of(StorageObjectOrIOException.create(e));
- } catch (Exception e) {
- IOException ioException =
- new IOException(String.format("Error trying to get %s: %s", path,
e));
- return
ImmutableList.of(StorageObjectOrIOException.create(ioException));
- }
- }
-
- List<StorageObjectOrIOException[]> results = new ArrayList<>();
- executeBatches(makeGetBatches(gcsPaths, results));
- ImmutableList.Builder<StorageObjectOrIOException> ret =
ImmutableList.builder();
- for (StorageObjectOrIOException[] result : results) {
- ret.add(result[0]);
- }
- return ret.build();
+ List<GcsUtilLegacy.StorageObjectOrIOException> legacy =
delegate.getObjects(gcsPaths);
+ return legacy.stream()
+ .map(StorageObjectOrIOException::fromLegacy)
+ .collect(java.util.stream.Collectors.toList());
}
public Objects listObjects(String bucket, String prefix, @Nullable String
pageToken)
throws IOException {
- return listObjects(bucket, prefix, pageToken, null);
+ return delegate.listObjects(bucket, prefix, pageToken);
}
- /**
- * Lists {@link Objects} given the {@code bucket}, {@code prefix}, {@code
pageToken}.
- *
- * <p>For more details, see
https://cloud.google.com/storage/docs/json_api/v1/objects/list.
- */
public Objects listObjects(
String bucket, String prefix, @Nullable String pageToken, @Nullable
String delimiter)
throws IOException {
- // List all objects that start with the prefix (including objects in
sub-directories).
- Storage.Objects.List listObject = storageClient.objects().list(bucket);
- listObject.setMaxResults(MAX_LIST_ITEMS_PER_CALL);
- listObject.setPrefix(prefix);
- listObject.setDelimiter(delimiter);
-
- if (pageToken != null) {
- listObject.setPageToken(pageToken);
- }
-
- try {
- return ResilientOperation.retry(
- listObject::execute, createBackOff(), RetryDeterminer.SOCKET_ERRORS,
IOException.class);
- } catch (Exception e) {
- throw new IOException(
- String.format("Unable to match files in bucket %s, prefix %s.",
bucket, prefix), e);
- }
+ return delegate.listObjects(bucket, prefix, pageToken, delimiter);
}
- /**
- * Returns the file size from GCS or throws {@link FileNotFoundException} if
the resource does not
- * exist.
- */
@VisibleForTesting
List<Long> fileSizes(List<GcsPath> paths) throws IOException {
- List<StorageObjectOrIOException> results = getObjects(paths);
-
- ImmutableList.Builder<Long> ret = ImmutableList.builder();
- for (StorageObjectOrIOException result : results) {
- ret.add(toFileSize(result));
- }
- return ret.build();
- }
-
- private Long toFileSize(StorageObjectOrIOException
storageObjectOrIOException)
- throws IOException {
- if (storageObjectOrIOException.ioException() != null) {
- throw storageObjectOrIOException.ioException();
- } else {
- return storageObjectOrIOException.storageObject().getSize().longValue();
- }
- }
-
- @VisibleForTesting
- void setCloudStorageImpl(GoogleCloudStorage g) {
Review Comment:
While `setCloudStorageImpl` is no longer surfaced for testing in
GcsUtil.java, we still keep them in GcsUtilLegacy.java
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]