scwhittle commented on code in PR #32769:
URL: https://github.com/apache/beam/pull/32769#discussion_r1812224493


##########
sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java:
##########
@@ -123,6 +125,58 @@ public static GcsCountersOptions create(
     }
   }
 
+  public static class GcsReadOptionsFactory
+      implements DefaultValueFactory<GoogleCloudStorageReadOptions> {
+    @Override
+    public GoogleCloudStorageReadOptions create(PipelineOptions options) {
+      try {
+        // Check if gcs-connector-hadoop is loaded into classpath
+        
Class.forName("com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration");
+        Configuration config = new Configuration();
+        return GoogleCloudStorageReadOptions.builder()
+            .setFastFailOnNotFound(
+                
GoogleHadoopFileSystemConfiguration.GCS_INPUT_STREAM_FAST_FAIL_ON_NOT_FOUND_ENABLE
+                    .get(config, config::getBoolean))
+            .setSupportGzipEncoding(
+                
GoogleHadoopFileSystemConfiguration.GCS_INPUT_STREAM_SUPPORT_GZIP_ENCODING_ENABLE
+                    .get(config, config::getBoolean))
+            .setInplaceSeekLimit(
+                
GoogleHadoopFileSystemConfiguration.GCS_INPUT_STREAM_INPLACE_SEEK_LIMIT.get(
+                    config, config::getLong))
+            .setFadvise(
+                
GoogleHadoopFileSystemConfiguration.GCS_INPUT_STREAM_FADVISE.get(
+                    config, config::getEnum))
+            .setMinRangeRequestSize(
+                
GoogleHadoopFileSystemConfiguration.GCS_INPUT_STREAM_MIN_RANGE_REQUEST_SIZE.get(
+                    config, config::getInt))
+            .setGrpcChecksumsEnabled(
+                
GoogleHadoopFileSystemConfiguration.GCS_GRPC_CHECKSUMS_ENABLE.get(
+                    config, config::getBoolean))
+            .setGrpcReadTimeoutMillis(
+                
GoogleHadoopFileSystemConfiguration.GCS_GRPC_READ_TIMEOUT_MS.get(
+                    config, config::getLong))
+            .setGrpcReadMessageTimeoutMillis(
+                
GoogleHadoopFileSystemConfiguration.GCS_GRPC_READ_MESSAGE_TIMEOUT_MS.get(
+                    config, config::getLong))
+            .setGrpcReadMetadataTimeoutMillis(
+                
GoogleHadoopFileSystemConfiguration.GCS_GRPC_READ_METADATA_TIMEOUT_MS.get(
+                    config, config::getLong))
+            .setGrpcReadZeroCopyEnabled(
+                
GoogleHadoopFileSystemConfiguration.GCS_GRPC_READ_ZEROCOPY_ENABLE.get(
+                    config, config::getBoolean))
+            .setTraceLogEnabled(
+                GoogleHadoopFileSystemConfiguration.GCS_TRACE_LOG_ENABLE.get(
+                    config, config::getBoolean))
+            .setTraceLogTimeThreshold(
+                
GoogleHadoopFileSystemConfiguration.GCS_TRACE_LOG_TIME_THRESHOLD_MS.get(
+                    config, config::getLong))
+            .build();
+      } catch (ClassNotFoundException e) {

Review Comment:
   > Or, I could omit this if/else branch entirely and always return 
GoogleCloudStorageReadOptions.DEFAULT, and leave it up to the user to supply a 
GoogleCloudStorageReadOptions instance (thus passing the Hadoop dependency down 
to the user-end).
   
   I think this would be preferable to avoid having to pull in the other 
packages but asking for some other Beam maintainers more familiar with dep 
management etc to take a look as well.



##########
sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java:
##########
@@ -249,17 +307,20 @@ public static boolean isWildcard(GcsPath spec) {
       Credentials credentials,
       @Nullable Integer uploadBufferSizeBytes,
       @Nullable Integer rewriteDataOpBatchLimit,
-      GcsCountersOptions gcsCountersOptions) {
+      GcsCountersOptions gcsCountersOptions,
+      GoogleCloudStorageReadOptions gcsReadOptions) {
     this.storageClient = storageClient;
     this.httpRequestInitializer = httpRequestInitializer;
     this.uploadBufferSizeBytes = uploadBufferSizeBytes;
     this.executorService = executorService;
     this.credentials = credentials;
     this.maxBytesRewrittenPerCall = null;
     this.numRewriteTokensUsed = null;
+    this.googleCloudStorageReadOptions = gcsReadOptions;
     googleCloudStorageOptions =
         GoogleCloudStorageOptions.builder()
             .setAppName("Beam")
+            .setReadChannelOptions(this.googleCloudStorageReadOptions)

Review Comment:
   This seems like a bug in `GoogleCloudStorageImpl.open` method that it 
doesn't use these options but you can remove the separate member variable and 
use `googleCloudStorageOptions.getReadChannelOptions()` in `open` below below.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to