shunping commented on code in PR #36876:
URL: https://github.com/apache/beam/pull/36876#discussion_r2553137475


##########
sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilLegacy.java:
##########
@@ -0,0 +1,1448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.gcp.util;
+
+import static org.apache.beam.sdk.io.FileSystemUtils.wildcardToRegexp;
+import static org.apache.beam.sdk.options.ExperimentalOptions.hasExperiment;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.api.client.googleapis.batch.BatchRequest;
+import com.google.api.client.googleapis.batch.json.JsonBatchCallback;
+import com.google.api.client.googleapis.json.GoogleJsonError;
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import 
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest;
+import com.google.api.client.http.HttpHeaders;
+import com.google.api.client.http.HttpRequestInitializer;
+import com.google.api.client.http.HttpStatusCodes;
+import com.google.api.client.http.HttpTransport;
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.Sleeper;
+import com.google.api.services.storage.Storage;
+import com.google.api.services.storage.model.Bucket;
+import com.google.api.services.storage.model.Objects;
+import com.google.api.services.storage.model.RewriteResponse;
+import com.google.api.services.storage.model.StorageObject;
+import com.google.auth.Credentials;
+import com.google.auto.value.AutoValue;
+import com.google.cloud.hadoop.gcsio.CreateObjectOptions;
+import com.google.cloud.hadoop.gcsio.GoogleCloudStorage;
+import com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl;
+import com.google.cloud.hadoop.gcsio.GoogleCloudStorageOptions;
+import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadOptions;
+import com.google.cloud.hadoop.gcsio.StorageResourceId;
+import com.google.cloud.hadoop.util.ApiErrorExtractor;
+import com.google.cloud.hadoop.util.AsyncWriteChannelOptions;
+import com.google.cloud.hadoop.util.ResilientOperation;
+import com.google.cloud.hadoop.util.RetryDeterminer;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.AccessDeniedException;
+import java.nio.file.FileAlreadyExistsException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.beam.runners.core.metrics.GcpResourceIdentifiers;
+import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
+import org.apache.beam.runners.core.metrics.ServiceCallMetric;
+import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import 
org.apache.beam.sdk.extensions.gcp.util.channels.CountingSeekableByteChannel;
+import 
org.apache.beam.sdk.extensions.gcp.util.channels.CountingWritableByteChannel;
+import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
+import org.apache.beam.sdk.io.fs.MoveOptions;
+import org.apache.beam.sdk.io.fs.MoveOptions.StandardMoveOptions;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.MoreFutures;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.MoreExecutors;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Provides operations on GCS. */
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+public class GcsUtilLegacy {
+
+  @AutoValue
+  public abstract static class GcsCountersOptions {
+    public abstract @Nullable String getReadCounterPrefix();
+
+    public abstract @Nullable String getWriteCounterPrefix();
+
+    public boolean hasAnyPrefix() {
+      return getWriteCounterPrefix() != null || getReadCounterPrefix() != null;
+    }
+
+    public static GcsCountersOptions create(
+        @Nullable String readCounterPrefix, @Nullable String 
writeCounterPrefix) {
+      return new AutoValue_GcsUtilLegacy_GcsCountersOptions(readCounterPrefix, 
writeCounterPrefix);
+    }
+  }
+
+  public static class GcsReadOptionsFactory
+      implements DefaultValueFactory<GoogleCloudStorageReadOptions> {
+    @Override
+    public GoogleCloudStorageReadOptions create(PipelineOptions options) {
+      return GoogleCloudStorageReadOptions.DEFAULT;
+    }
+  }
+
+  /**
+   * This is a {@link DefaultValueFactory} able to create a {@link 
GcsUtilLegacy} using any
+   * transport flags specified on the {@link PipelineOptions}.
+   */
+  public static class GcsUtilFactory implements 
DefaultValueFactory<GcsUtilLegacy> {
+    /**
+     * Returns an instance of {@link GcsUtilLegacy} based on the {@link 
PipelineOptions}.
+     *
+     * <p>If no instance has previously been created, one is created and the 
value stored in {@code
+     * options}.
+     */
+    @Override
+    public GcsUtilLegacy create(PipelineOptions options) {
+      LOG.debug("Creating new GcsUtil");
+      GcsOptions gcsOptions = options.as(GcsOptions.class);
+      Storage.Builder storageBuilder = Transport.newStorageClient(gcsOptions);
+      return new GcsUtilLegacy(
+          storageBuilder.build(),
+          storageBuilder.getHttpRequestInitializer(),
+          gcsOptions.getExecutorService(),
+          hasExperiment(options, "use_grpc_for_gcs"),
+          gcsOptions.getGcpCredential(),
+          gcsOptions.getGcsUploadBufferSizeBytes(),
+          gcsOptions.getGcsRewriteDataOpBatchLimit(),
+          GcsCountersOptions.create(
+              gcsOptions.getEnableBucketReadMetricCounter()
+                  ? gcsOptions.getGcsReadCounterPrefix()
+                  : null,
+              gcsOptions.getEnableBucketWriteMetricCounter()
+                  ? gcsOptions.getGcsWriteCounterPrefix()
+                  : null),
+          gcsOptions.getGoogleCloudStorageReadOptions());
+    }
+  }
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(GcsUtilLegacy.class);
+
+  /** Maximum number of items to retrieve per Objects.List request. */
+  private static final long MAX_LIST_ITEMS_PER_CALL = 1024;
+
+  /** Matches a glob containing a wildcard, capturing the portion before the 
first wildcard. */
+  private static final Pattern GLOB_PREFIX = 
Pattern.compile("(?<PREFIX>[^\\[*?]*)[\\[*?].*");
+
+  /** Maximum number of requests permitted in a GCS batch request. */
+  private static final int MAX_REQUESTS_PER_BATCH = 100;
+  /** Default maximum number of requests permitted in a GCS batch request 
where data is copied. */
+  private static final int MAX_REQUESTS_PER_COPY_BATCH = 10;
+  /** Maximum number of concurrent batches of requests executing on GCS. */
+  private static final int MAX_CONCURRENT_BATCHES = 256;
+
+  private static final FluentBackoff BACKOFF_FACTORY =
+      
FluentBackoff.DEFAULT.withMaxRetries(10).withInitialBackoff(Duration.standardSeconds(1));
+  private static final RetryDeterminer<IOException> RETRY_DETERMINER =
+      new RetryDeterminer<IOException>() {
+        @Override
+        public boolean shouldRetry(IOException e) {
+          if (e instanceof GoogleJsonResponseException) {
+            int statusCode = ((GoogleJsonResponseException) e).getStatusCode();
+            return statusCode == 408 // Request Timeout
+                || statusCode == 429 // Too many requests
+                || (statusCode >= 500 && statusCode < 600); // Server errors
+          }
+          return RetryDeterminer.SOCKET_ERRORS.shouldRetry(e);
+        }
+      };
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  /** Client for the GCS API. */
+  private Storage storageClient;
+
+  private Supplier<BatchInterface> batchRequestSupplier;
+
+  private final HttpRequestInitializer httpRequestInitializer;
+  /** Buffer size for GCS uploads (in bytes). */
+  private final @Nullable Integer uploadBufferSizeBytes;
+
+  // Helper delegate for turning IOExceptions from API calls into higher-level 
semantics.
+  private final ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
+
+  // Unbounded thread pool for codependent pipeline operations that will 
deadlock the pipeline if
+  // starved for threads.
+  // Exposed for testing.
+  final ExecutorService executorService;
+
+  private final Credentials credentials;
+
+  private GoogleCloudStorage googleCloudStorage;
+  private GoogleCloudStorageOptions googleCloudStorageOptions;
+
+  private final int rewriteDataOpBatchLimit;
+
+  private final GcsCountersOptions gcsCountersOptions;
+
+  /** Rewrite operation setting. For testing purposes only. */
+  @VisibleForTesting @Nullable Long maxBytesRewrittenPerCall;
+
+  @VisibleForTesting @Nullable AtomicInteger numRewriteTokensUsed;
+
+  /** Returns the prefix portion of the glob that doesn't contain wildcards. */
+  public static String getNonWildcardPrefix(String globExp) {
+    Matcher m = GLOB_PREFIX.matcher(globExp);
+    checkArgument(m.matches(), String.format("Glob expression: [%s] is not 
expandable.", globExp));
+    return m.group("PREFIX");
+  }
+
+  /** Returns true if the given {@code spec} contains wildcard. */
+  public static boolean isWildcard(GcsPath spec) {
+    return GLOB_PREFIX.matcher(spec.getObject()).matches();
+  }
+
+  @VisibleForTesting
+  GcsUtilLegacy(
+      Storage storageClient,
+      HttpRequestInitializer httpRequestInitializer,
+      ExecutorService executorService,
+      Boolean shouldUseGrpc,
+      Credentials credentials,
+      @Nullable Integer uploadBufferSizeBytes,
+      @Nullable Integer rewriteDataOpBatchLimit,
+      GcsCountersOptions gcsCountersOptions,
+      GcsOptions gcsOptions) {
+    this(
+        storageClient,
+        httpRequestInitializer,
+        executorService,
+        shouldUseGrpc,
+        credentials,
+        uploadBufferSizeBytes,
+        rewriteDataOpBatchLimit,
+        gcsCountersOptions,
+        gcsOptions.getGoogleCloudStorageReadOptions());
+  }
+
+  @VisibleForTesting
+  GcsUtilLegacy(
+      Storage storageClient,
+      HttpRequestInitializer httpRequestInitializer,
+      ExecutorService executorService,
+      Boolean shouldUseGrpc,
+      Credentials credentials,
+      @Nullable Integer uploadBufferSizeBytes,
+      @Nullable Integer rewriteDataOpBatchLimit,
+      GcsCountersOptions gcsCountersOptions,
+      GoogleCloudStorageReadOptions gcsReadOptions) {
+    this.storageClient = storageClient;
+    this.httpRequestInitializer = httpRequestInitializer;
+    this.uploadBufferSizeBytes = uploadBufferSizeBytes;
+    this.executorService = executorService;
+    this.credentials = credentials;
+    this.maxBytesRewrittenPerCall = null;
+    this.numRewriteTokensUsed = null;
+    googleCloudStorageOptions =
+        GoogleCloudStorageOptions.builder()
+            .setAppName("Beam")
+            .setReadChannelOptions(gcsReadOptions)
+            .setGrpcEnabled(shouldUseGrpc)
+            .build();
+    googleCloudStorage =
+        createGoogleCloudStorage(googleCloudStorageOptions, storageClient, 
credentials);
+    this.batchRequestSupplier =
+        () -> {
+          // Capture reference to this so that the most recent storageClient 
and initializer
+          // are used.
+          GcsUtilLegacy util = this;
+          return new BatchInterface() {
+            final BatchRequest batch = 
util.storageClient.batch(util.httpRequestInitializer);
+
+            @Override
+            public <T> void queue(
+                AbstractGoogleJsonClientRequest<T> request, 
JsonBatchCallback<T> cb)
+                throws IOException {
+              request.queue(batch, cb);
+            }
+
+            @Override
+            public void execute() throws IOException {
+              batch.execute();
+            }
+
+            @Override
+            public int size() {
+              return batch.size();
+            }
+          };
+        };
+    this.rewriteDataOpBatchLimit =
+        rewriteDataOpBatchLimit == null ? MAX_REQUESTS_PER_COPY_BATCH : 
rewriteDataOpBatchLimit;
+    this.gcsCountersOptions = gcsCountersOptions;
+  }
+
+  // Use this only for testing purposes.
+  protected void setStorageClient(Storage storageClient) {
+    this.storageClient = storageClient;
+  }
+
+  // Use this only for testing purposes.
+  protected void setBatchRequestSupplier(Supplier<BatchInterface> supplier) {
+    this.batchRequestSupplier = supplier;
+  }
+
+  /**
+   * Expands a pattern into matched paths. The pattern path may contain globs, 
which are expanded in
+   * the result. For patterns that only match a single object, we ensure that 
the object exists.
+   */
+  public List<GcsPath> expand(GcsPath gcsPattern) throws IOException {
+    Pattern p = null;
+    String prefix = null;
+    if (isWildcard(gcsPattern)) {
+      // Part before the first wildcard character.
+      prefix = getNonWildcardPrefix(gcsPattern.getObject());
+      p = Pattern.compile(wildcardToRegexp(gcsPattern.getObject()));
+    } else {
+      // Not a wildcard.
+      try {
+        // Use a get request to fetch the metadata of the object, and ignore 
the return value.
+        // The request has strong global consistency.
+        getObject(gcsPattern);
+        return ImmutableList.of(gcsPattern);
+      } catch (FileNotFoundException e) {
+        // If the path was not found, return an empty list.
+        return ImmutableList.of();
+      }
+    }
+
+    LOG.debug(
+        "matching files in bucket {}, prefix {} against pattern {}",
+        gcsPattern.getBucket(),
+        prefix,
+        p.toString());
+
+    String pageToken = null;
+    List<GcsPath> results = new ArrayList<>();
+    do {
+      Objects objects = listObjects(gcsPattern.getBucket(), prefix, pageToken);
+      if (objects.getItems() == null) {
+        break;
+      }
+
+      // Filter objects based on the regex.
+      for (StorageObject o : objects.getItems()) {
+        String name = o.getName();
+        // Skip directories, which end with a slash.
+        if (p.matcher(name).matches() && !name.endsWith("/")) {
+          LOG.debug("Matched object: {}", name);
+          results.add(GcsPath.fromObject(o));
+        }
+      }
+      pageToken = objects.getNextPageToken();
+    } while (pageToken != null);
+
+    return results;
+  }
+
+  @VisibleForTesting
+  @Nullable
+  Integer getUploadBufferSizeBytes() {
+    return uploadBufferSizeBytes;
+  }
+
+  private static BackOff createBackOff() {
+    return BackOffAdapter.toGcpBackOff(BACKOFF_FACTORY.backoff());
+  }
+
+  /**
+   * Returns the file size from GCS or throws {@link FileNotFoundException} if 
the resource does not
+   * exist.
+   */
+  public long fileSize(GcsPath path) throws IOException {
+    return getObject(path).getSize().longValue();
+  }
+
+  /** Returns the {@link StorageObject} for the given {@link GcsPath}. */
+  public StorageObject getObject(GcsPath gcsPath) throws IOException {
+    return getObject(gcsPath, createBackOff(), Sleeper.DEFAULT);
+  }
+
+  @VisibleForTesting
+  StorageObject getObject(GcsPath gcsPath, BackOff backoff, Sleeper sleeper) 
throws IOException {
+    Storage.Objects.Get getObject =
+        storageClient.objects().get(gcsPath.getBucket(), gcsPath.getObject());
+    try {
+      return ResilientOperation.retry(
+          getObject::execute, backoff, RetryDeterminer.SOCKET_ERRORS, 
IOException.class, sleeper);
+    } catch (IOException | InterruptedException e) {
+      if (e instanceof InterruptedException) {
+        Thread.currentThread().interrupt();
+      }
+      if (e instanceof IOException && 
errorExtractor.itemNotFound((IOException) e)) {
+        throw new FileNotFoundException(gcsPath.toString());
+      }
+      throw new IOException(
+          String.format("Unable to get the file object for path %s.", 
gcsPath), e);
+    }
+  }
+
+  /**
+   * Returns {@link StorageObjectOrIOException StorageObjectOrIOExceptions} 
for the given {@link
+   * GcsPath GcsPaths}.
+   */
+  public List<StorageObjectOrIOException> getObjects(List<GcsPath> gcsPaths) 
throws IOException {
+    if (gcsPaths.isEmpty()) {
+      return ImmutableList.of();
+    } else if (gcsPaths.size() == 1) {
+      GcsPath path = gcsPaths.get(0);
+      try {
+        StorageObject object = getObject(path);
+        return ImmutableList.of(StorageObjectOrIOException.create(object));
+      } catch (IOException e) {
+        return ImmutableList.of(StorageObjectOrIOException.create(e));
+      } catch (Exception e) {
+        IOException ioException =
+            new IOException(String.format("Error trying to get %s: %s", path, 
e));
+        return 
ImmutableList.of(StorageObjectOrIOException.create(ioException));
+      }
+    }
+
+    List<StorageObjectOrIOException[]> results = new ArrayList<>();
+    executeBatches(makeGetBatches(gcsPaths, results));
+    ImmutableList.Builder<StorageObjectOrIOException> ret = 
ImmutableList.builder();
+    for (StorageObjectOrIOException[] result : results) {
+      ret.add(result[0]);
+    }
+    return ret.build();
+  }
+
+  public Objects listObjects(String bucket, String prefix, @Nullable String 
pageToken)
+      throws IOException {
+    return listObjects(bucket, prefix, pageToken, null);
+  }
+
+  /**
+   * Lists {@link Objects} given the {@code bucket}, {@code prefix}, {@code 
pageToken}.
+   *
+   * <p>For more details, see 
https://cloud.google.com/storage/docs/json_api/v1/objects/list.
+   */
+  public Objects listObjects(
+      String bucket, String prefix, @Nullable String pageToken, @Nullable 
String delimiter)
+      throws IOException {
+    // List all objects that start with the prefix (including objects in 
sub-directories).
+    Storage.Objects.List listObject = storageClient.objects().list(bucket);
+    listObject.setMaxResults(MAX_LIST_ITEMS_PER_CALL);
+    listObject.setPrefix(prefix);
+    listObject.setDelimiter(delimiter);
+
+    if (pageToken != null) {
+      listObject.setPageToken(pageToken);
+    }
+
+    try {
+      return ResilientOperation.retry(
+          listObject::execute, createBackOff(), RetryDeterminer.SOCKET_ERRORS, 
IOException.class);
+    } catch (Exception e) {
+      throw new IOException(
+          String.format("Unable to match files in bucket %s, prefix %s.", 
bucket, prefix), e);
+    }
+  }
+
+  /**
+   * Returns the file size from GCS or throws {@link FileNotFoundException} if 
the resource does not
+   * exist.
+   */
+  @VisibleForTesting
+  List<Long> fileSizes(List<GcsPath> paths) throws IOException {
+    List<StorageObjectOrIOException> results = getObjects(paths);
+
+    ImmutableList.Builder<Long> ret = ImmutableList.builder();
+    for (StorageObjectOrIOException result : results) {
+      ret.add(toFileSize(result));
+    }
+    return ret.build();
+  }
+
+  private Long toFileSize(StorageObjectOrIOException 
storageObjectOrIOException)
+      throws IOException {
+    if (storageObjectOrIOException.ioException() != null) {
+      throw storageObjectOrIOException.ioException();
+    } else {
+      return storageObjectOrIOException.storageObject().getSize().longValue();
+    }
+  }
+
+  @VisibleForTesting
+  void setCloudStorageImpl(GoogleCloudStorage g) {
+    googleCloudStorage = g;
+  }
+
+  @VisibleForTesting
+  void setCloudStorageImpl(GoogleCloudStorageOptions g) {
+    googleCloudStorageOptions = g;
+  }
+
+  /**
+   * Create an integer consumer that updates the counter identified by a 
prefix and a bucket name.
+   */
+  private static Consumer<Integer> createCounterConsumer(String 
counterNamePrefix, String bucket) {
+    return Metrics.counter(GcsUtil.class, String.format("%s_%s", 
counterNamePrefix, bucket))::inc;
+  }
+
+  private WritableByteChannel wrapInCounting(
+      WritableByteChannel writableByteChannel, String bucket) {
+    if (writableByteChannel instanceof CountingWritableByteChannel) {
+      return writableByteChannel;
+    }
+    return Optional.ofNullable(gcsCountersOptions.getWriteCounterPrefix())
+        .<WritableByteChannel>map(
+            prefix -> {
+              LOG.debug(
+                  "wrapping writable byte channel using counter name prefix {} 
and bucket {}",
+                  prefix,
+                  bucket);
+              return new CountingWritableByteChannel(
+                  writableByteChannel, createCounterConsumer(prefix, bucket));
+            })
+        .orElse(writableByteChannel);
+  }
+
+  private SeekableByteChannel wrapInCounting(
+      SeekableByteChannel seekableByteChannel, String bucket) {
+    if (seekableByteChannel instanceof CountingSeekableByteChannel
+        || !gcsCountersOptions.hasAnyPrefix()) {
+      return seekableByteChannel;
+    }
+
+    return new CountingSeekableByteChannel(
+        seekableByteChannel,
+        Optional.ofNullable(gcsCountersOptions.getReadCounterPrefix())
+            .map(
+                prefix -> {
+                  LOG.debug(
+                      "wrapping seekable byte channel with \"bytes read\" 
counter name prefix {}"
+                          + " and bucket {}",
+                      prefix,
+                      bucket);
+                  return createCounterConsumer(prefix, bucket);
+                })
+            .orElse(null),
+        Optional.ofNullable(gcsCountersOptions.getWriteCounterPrefix())
+            .map(
+                prefix -> {
+                  LOG.debug(
+                      "wrapping seekable byte channel with \"bytes written\" 
counter name prefix {}"
+                          + " and bucket {}",
+                      prefix,
+                      bucket);
+                  return createCounterConsumer(prefix, bucket);
+                })
+            .orElse(null));
+  }
+
+  /**
+   * Opens an object in GCS.
+   *
+   * <p>Returns a SeekableByteChannel that provides access to data in the 
bucket.
+   *
+   * @param path the GCS filename to read from
+   * @return a SeekableByteChannel that can read the object data
+   */
+  public SeekableByteChannel open(GcsPath path) throws IOException {
+    return open(path, this.googleCloudStorageOptions.getReadChannelOptions());

Review Comment:
   Comparing with the previous implementation 
https://github.com/apache/beam/blob/b20ccbfe3efa147ea61b32438a44a31133046b4f/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java#L593-L600,
   
   we will invoke the open() function below to include the IO request counters 
similar to the create() functions below.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to