mosche commented on a change in pull request #16607:
URL: https://github.com/apache/beam/pull/16607#discussion_r792593619
##########
File path: sdks/java/io/amazon-web-services2/build.gradle
##########
@@ -78,3 +78,8 @@ test {
])
maxParallelForks 4
}
+
+task writeClasspath {
Review comment:
Was this change committed intentional?
##########
File path:
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/s3/S3FileSystemRegistrar.java
##########
@@ -20,23 +20,39 @@
import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
import com.google.auto.service.AutoService;
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.io.FileSystem;
import org.apache.beam.sdk.io.FileSystemRegistrar;
-import org.apache.beam.sdk.io.aws2.options.S3Options;
import org.apache.beam.sdk.options.PipelineOptions;
-import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Streams;
-/** {@link AutoService} registrar for the {@link S3FileSystem}. */
+/**
+ * {@link AutoService} registrar for the {@link S3FileSystem}.
+ *
+ * <p>Creates instances of {@link S3FileSystem} for each scheme registered
with a {@link
+ * S3FileSystemSchemeRegistrar}.
+ */
@AutoService(FileSystemRegistrar.class)
@Experimental(Kind.FILESYSTEM)
public class S3FileSystemRegistrar implements FileSystemRegistrar {
@Override
public Iterable<FileSystem<?>> fromOptions(@Nonnull PipelineOptions options)
{
checkNotNull(options, "Expect the runner have called
FileSystems.setDefaultPipelineOptions().");
- return ImmutableList.of(new S3FileSystem(options.as(S3Options.class)));
+ Map<String, FileSystem<?>> fileSystems =
+ Streams.stream(
+ ServiceLoader.load(
+ S3FileSystemSchemeRegistrar.class,
ReflectHelpers.findClassLoader()))
+ .flatMap(r -> Streams.stream(r.fromOptions(options)))
+ .map(S3FileSystem::new)
+ // Throws IllegalStateException if any duplicate schemes exist.
+ .collect(Collectors.toMap(S3FileSystem::getScheme, f ->
(FileSystem<?>) f));
Review comment:
For readability maybe use `Function.identity()`? Alternatively you could
also use
```java
Collectors.toMap(S3FileSystemConfiguration::getScheme, S3FileSystem::new)
```
##########
File path:
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/s3/S3FileSystemConfiguration.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws2.s3;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.io.aws2.options.S3ClientBuilderFactory;
+import org.apache.beam.sdk.io.aws2.options.S3Options;
+import org.apache.beam.sdk.util.InstanceBuilder;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import software.amazon.awssdk.services.s3.S3ClientBuilder;
+
+/**
+ * Object used to configure {@link S3FileSystem}.
+ *
+ * @see S3Options
+ * @see S3FileSystemSchemeRegistrar
+ */
+@AutoValue
+@Experimental(Kind.FILESYSTEM)
+public abstract class S3FileSystemConfiguration {
+ public static final int MINIMUM_UPLOAD_BUFFER_SIZE_BYTES = 5_242_880;
+
+ /** The uri scheme used by resources on this filesystem. */
+ public abstract String getScheme();
+
+ /** The AWS S3 storage class used for creating S3 objects. */
+ public abstract String getS3StorageClass();
+
+ /** Size of S3 upload chnks. */
+ public abstract int getS3UploadBufferSizeBytes();
+
+ /** Thread pool size, limiting the max concurrent S3 operations. */
+ public abstract int getS3ThreadPoolSize();
+
+ /** Algorithm for SSE-S3 encryption, e.g. AES256. */
+ public abstract @Nullable String getSSEAlgorithm();
+
+ /** SSE key for SSE-C encryption, e.g. a base64 encoded key and the
algorithm. */
+ public abstract SSECustomerKey getSSECustomerKey();
+
+ /** KMS key id for SSE-KMS encyrption, e.g. "arn:aws:kms:..." */
+ public abstract @Nullable String getSSEKMSKeyId();
+
+ /** Builder used to create the {@code S3Client}. */
+ public abstract S3ClientBuilder getS3ClientBuilder();
+
+ /** Creates a new uninitialized {@link Builder}. */
+ public static Builder builder() {
+ return new AutoValue_S3FileSystemConfiguration.Builder();
+ }
+
+ /** Creates a new {@link Builder} with values initialized by this instance's
properties. */
+ public abstract Builder toBuilder();
+
+ /**
+ * Creates a new {@link Builder} with values initialized by the properties
of {@code s3Options}.
+ */
+ public static Builder fromS3Options(S3Options s3Options) {
+ return builder()
+ .setScheme("s3")
+ .setS3StorageClass(s3Options.getS3StorageClass())
+ .setS3UploadBufferSizeBytes(s3Options.getS3UploadBufferSizeBytes())
+ .setS3ThreadPoolSize(s3Options.getS3ThreadPoolSize())
+ .setSSEAlgorithm(s3Options.getSSEAlgorithm())
+ .setSSECustomerKey(s3Options.getSSECustomerKey())
+ .setSSEKMSKeyId(s3Options.getSSEKMSKeyId())
+ .setS3ClientBuilder(getBuilder(s3Options));
+ }
+
+ /** Creates a new {@link S3ClientBuilder} as specified by {@code s3Options}.
*/
+ public static S3ClientBuilder getBuilder(S3Options s3Options) {
Review comment:
Should this be private? It looks like an internal helper that is
probably better not exposed publicly...
##########
File path:
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/s3/S3FileSystemConfiguration.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws2.s3;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.io.aws2.options.S3ClientBuilderFactory;
+import org.apache.beam.sdk.io.aws2.options.S3Options;
+import org.apache.beam.sdk.util.InstanceBuilder;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import software.amazon.awssdk.services.s3.S3ClientBuilder;
+
+/**
+ * Object used to configure {@link S3FileSystem}.
+ *
+ * @see S3Options
+ * @see S3FileSystemSchemeRegistrar
+ */
+@AutoValue
+@Experimental(Kind.FILESYSTEM)
+public abstract class S3FileSystemConfiguration {
+ public static final int MINIMUM_UPLOAD_BUFFER_SIZE_BYTES = 5_242_880;
+
+ /** The uri scheme used by resources on this filesystem. */
+ public abstract String getScheme();
+
+ /** The AWS S3 storage class used for creating S3 objects. */
+ public abstract String getS3StorageClass();
+
+ /** Size of S3 upload chnks. */
+ public abstract int getS3UploadBufferSizeBytes();
+
+ /** Thread pool size, limiting the max concurrent S3 operations. */
+ public abstract int getS3ThreadPoolSize();
+
+ /** Algorithm for SSE-S3 encryption, e.g. AES256. */
+ public abstract @Nullable String getSSEAlgorithm();
+
+ /** SSE key for SSE-C encryption, e.g. a base64 encoded key and the
algorithm. */
+ public abstract SSECustomerKey getSSECustomerKey();
+
+ /** KMS key id for SSE-KMS encyrption, e.g. "arn:aws:kms:..." */
+ public abstract @Nullable String getSSEKMSKeyId();
+
+ /** Builder used to create the {@code S3Client}. */
+ public abstract S3ClientBuilder getS3ClientBuilder();
+
+ /** Creates a new uninitialized {@link Builder}. */
+ public static Builder builder() {
+ return new AutoValue_S3FileSystemConfiguration.Builder();
+ }
+
+ /** Creates a new {@link Builder} with values initialized by this instance's
properties. */
+ public abstract Builder toBuilder();
+
+ /**
+ * Creates a new {@link Builder} with values initialized by the properties
of {@code s3Options}.
+ */
+ public static Builder fromS3Options(S3Options s3Options) {
Review comment:
Wondering, `S3FileSystemConfiguration.fromS3Options` suggests this
returns a `S3FileSystemConfiguration` rather than its builder.
Could it be `builder(S3Options s3Options)` or `builderOf/From(S3Options
s3Options)` instead ... or alternatively return the configuration object rather
than the builder?
##########
File path: CHANGES.md
##########
@@ -56,6 +56,7 @@
## I/Os
* Support for stopReadTime on KafkaIO SDF
(Java).([BEAM-13171](https://issues.apache.org/jira/browse/BEAM-13171)).
+* Added ability to register URI schemes to use the S3 protocol via FileIO
using amazon-web-services2 (amazon-web-services already had this ability).
([BEAM-12435](https://issues.apache.org/jira/brows/BEAM-12435)).
Review comment:
```suggestion
* Added ability to register URI schemes to use the S3 protocol via FileIO
using amazon-web-services2 (amazon-web-services already had this ability).
([BEAM-12435](https://issues.apache.org/jira/brows/BEAM-12435),
[BEAM-13245](https://issues.apache.org/jira/brows/BEAM-13245)).
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]