steveloughran commented on code in PR #5767:
URL: https://github.com/apache/hadoop/pull/5767#discussion_r1269171373
##########
hadoop-project/pom.xml:
##########
@@ -1145,6 +1146,17 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>software.amazon.encryption.s3</groupId>
Review Comment:
does this come in bundle.jar?
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java:
##########
@@ -98,10 +97,10 @@
import software.amazon.awssdk.transfer.s3.model.FileUpload;
import software.amazon.awssdk.transfer.s3.model.UploadFileRequest;
-import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import software.amazon.encryption.s3.materials.KmsKeyring;
Review Comment:
should be with reset of imports
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/GetS3RegionOperation.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.HeadBucketRequest;
+import software.amazon.awssdk.services.s3.model.HeadBucketResponse;
+import software.amazon.awssdk.services.s3.model.S3Exception;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.s3a.AWSCredentialProviderList;
+import org.apache.hadoop.fs.s3a.UnknownStoreException;
+import org.apache.hadoop.fs.store.LogExactlyOnce;
+import org.apache.hadoop.util.DurationInfo;
+
+import static org.apache.hadoop.fs.s3a.Constants.BUCKET_REGION_HEADER;
+import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_BUCKET_PREFIX;
+import static org.apache.hadoop.fs.s3a.Statistic.STORE_REGION_PROBE;
+import static
org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_301_MOVED_PERMANENTLY;
+import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404_NOT_FOUND;
+import static
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration;
+
+/**
+ * Operation to get the S3 region for a bucket.
+ * If a region is not specified in {@link
org.apache.hadoop.fs.s3a.Constants#AWS_REGION}, this
+ * operation will warn and then probe S3 to get the region.
+ */
+public class GetS3RegionOperation extends ExecutingStoreOperation<Region> {
+
+ private final AWSCredentialProviderList credentials;
+
+ private final String region;
+
+ private final String bucket;
+
+ private final StoreContext context;
+
+ private final static Map<String, Region> BUCKET_REGIONS = new HashMap<>();
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ GetS3RegionOperation.class);
+
+ /** Exactly once log to warn about setting the region in config to avoid
probe. */
+ private static final LogExactlyOnce SET_REGION_WARNING = new
LogExactlyOnce(LOG);
+
+ public GetS3RegionOperation(
+ final StoreContext context,
+ String region,
+ String bucket,
+ AWSCredentialProviderList credentials) {
+ super(context);
+ this.context = context;
+ this.credentials = credentials;
+ this.region = region;
+ this.bucket = bucket;
+ }
+
+
+ @Override
+ public Region execute() throws IOException {
+
+ if (!StringUtils.isBlank(region)) {
+ return Region.of(region);
+ }
+
+ Region cachedRegion = BUCKET_REGIONS.get(bucket);
+
+ if (cachedRegion != null) {
+ LOG.debug("Got region {} for bucket {} from cache", cachedRegion,
bucket);
+ return cachedRegion;
+ }
+
+ try (DurationInfo ignore =
+ new DurationInfo(LOG, false, "Get S3 region")) {
Review Comment:
include bucket in log message
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/GetS3RegionOperation.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.HeadBucketRequest;
+import software.amazon.awssdk.services.s3.model.HeadBucketResponse;
+import software.amazon.awssdk.services.s3.model.S3Exception;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.s3a.AWSCredentialProviderList;
+import org.apache.hadoop.fs.s3a.UnknownStoreException;
+import org.apache.hadoop.fs.store.LogExactlyOnce;
+import org.apache.hadoop.util.DurationInfo;
+
+import static org.apache.hadoop.fs.s3a.Constants.BUCKET_REGION_HEADER;
+import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_BUCKET_PREFIX;
+import static org.apache.hadoop.fs.s3a.Statistic.STORE_REGION_PROBE;
+import static
org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_301_MOVED_PERMANENTLY;
+import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404_NOT_FOUND;
+import static
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration;
+
+/**
+ * Operation to get the S3 region for a bucket.
+ * If a region is not specified in {@link
org.apache.hadoop.fs.s3a.Constants#AWS_REGION}, this
+ * operation will warn and then probe S3 to get the region.
+ */
+public class GetS3RegionOperation extends ExecutingStoreOperation<Region> {
+
+ private final AWSCredentialProviderList credentials;
+
+ private final String region;
+
+ private final String bucket;
+
+ private final StoreContext context;
+
+ private final static Map<String, Region> BUCKET_REGIONS = new HashMap<>();
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ GetS3RegionOperation.class);
+
+ /** Exactly once log to warn about setting the region in config to avoid
probe. */
+ private static final LogExactlyOnce SET_REGION_WARNING = new
LogExactlyOnce(LOG);
+
+ public GetS3RegionOperation(
+ final StoreContext context,
+ String region,
+ String bucket,
+ AWSCredentialProviderList credentials) {
+ super(context);
+ this.context = context;
+ this.credentials = credentials;
+ this.region = region;
+ this.bucket = bucket;
+ }
+
+
+ @Override
+ public Region execute() throws IOException {
+
+ if (!StringUtils.isBlank(region)) {
+ return Region.of(region);
+ }
+
+ Region cachedRegion = BUCKET_REGIONS.get(bucket);
+
+ if (cachedRegion != null) {
+ LOG.debug("Got region {} for bucket {} from cache", cachedRegion,
bucket);
+ return cachedRegion;
+ }
+
+ try (DurationInfo ignore =
+ new DurationInfo(LOG, false, "Get S3 region")) {
+
+ SET_REGION_WARNING.warn(
Review Comment:
move above the try() just to tune log ordering
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java:
##########
@@ -358,7 +359,7 @@ public void
shouldBeAbleToSwitchOnS3PathStyleAccessViaConfigProperty()
try {
fs = S3ATestUtils.createTestFileSystem(conf);
assertNotNull(fs);
- S3Client s3 = fs.getAmazonS3ClientForTesting("configuration");
+ S3Client s3 = getS3Client("configuration");
Review Comment:
what about just force disabling CSE by removing base/bucket overrides and
setting to false?
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java:
##########
@@ -91,6 +95,43 @@ public S3Client createS3Client(
.build();
}
+ @Override
+ public S3Client createS3EncryptionClient(final S3AsyncClient s3AsyncClient,
+ final S3Client s3Client, final KmsKeyring keyring) {
+
+ return S3EncryptionClient.builder()
+ .wrappedAsyncClient(s3AsyncClient)
+ .wrappedClient(s3Client)
+ .enableLegacyUnauthenticatedModes(true)
+ .keyring(keyring)
Review Comment:
we've had some interest in supporting customer supplied keys too...this
would be the time to add it -or at least leave the points for it open. in
particular, that third argument should be a structure which contains the
keyring and some enum of option to use
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/GetS3RegionOperation.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.HeadBucketRequest;
+import software.amazon.awssdk.services.s3.model.HeadBucketResponse;
+import software.amazon.awssdk.services.s3.model.S3Exception;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.s3a.AWSCredentialProviderList;
+import org.apache.hadoop.fs.s3a.UnknownStoreException;
+import org.apache.hadoop.fs.store.LogExactlyOnce;
+import org.apache.hadoop.util.DurationInfo;
+
+import static org.apache.hadoop.fs.s3a.Constants.BUCKET_REGION_HEADER;
+import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_BUCKET_PREFIX;
+import static org.apache.hadoop.fs.s3a.Statistic.STORE_REGION_PROBE;
+import static
org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_301_MOVED_PERMANENTLY;
+import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404_NOT_FOUND;
+import static
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration;
+
+/**
+ * Operation to get the S3 region for a bucket.
+ * If a region is not specified in {@link
org.apache.hadoop.fs.s3a.Constants#AWS_REGION}, this
+ * operation will warn and then probe S3 to get the region.
+ */
+public class GetS3RegionOperation extends ExecutingStoreOperation<Region> {
+
+ private final AWSCredentialProviderList credentials;
+
+ private final String region;
+
+ private final String bucket;
+
+ private final StoreContext context;
+
+ private final static Map<String, Region> BUCKET_REGIONS = new HashMap<>();
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ GetS3RegionOperation.class);
+
+ /** Exactly once log to warn about setting the region in config to avoid
probe. */
+ private static final LogExactlyOnce SET_REGION_WARNING = new
LogExactlyOnce(LOG);
+
+ public GetS3RegionOperation(
+ final StoreContext context,
+ String region,
+ String bucket,
+ AWSCredentialProviderList credentials) {
+ super(context);
+ this.context = context;
+ this.credentials = credentials;
+ this.region = region;
+ this.bucket = bucket;
+ }
+
+
+ @Override
+ public Region execute() throws IOException {
+
+ if (!StringUtils.isBlank(region)) {
+ return Region.of(region);
+ }
+
+ Region cachedRegion = BUCKET_REGIONS.get(bucket);
+
+ if (cachedRegion != null) {
+ LOG.debug("Got region {} for bucket {} from cache", cachedRegion,
bucket);
+ return cachedRegion;
+ }
+
+ try (DurationInfo ignore =
+ new DurationInfo(LOG, false, "Get S3 region")) {
+
+ SET_REGION_WARNING.warn(
+ "Getting region for bucket {} from S3, this will slow down FS
initialisation. "
+ + "To avoid this, set the region using property {}", bucket,
+ FS_S3A_BUCKET_PREFIX + bucket + ".endpoint.region");
+
+ // build a s3 client with region eu-west-1 that can be used to get the
region of the
+ // bucket. Using eu-west-1, as headBucket() doesn't work with us-east-1.
This is because
+ // us-east-1 uses the endpoint s3.amazonaws.com, which resolves
bucket.s3.amazonaws.com
+ // to the actual region the bucket is in. As the request is signed with
us-east-1 and
+ // not the bucket's region, it fails.
+ S3Client getRegionS3Client =
+
S3Client.builder().region(Region.EU_WEST_1).credentialsProvider(credentials)
+ .build();
+
+ HeadBucketResponse headBucketResponse =
trackDuration(context.getInstrumentation(),
+ STORE_REGION_PROBE.getSymbol(),
+ () ->
getRegionS3Client.headBucket(HeadBucketRequest.builder().bucket(bucket).build()));
+
+ Region bucketRegion = Region.of(
+
headBucketResponse.sdkHttpResponse().headers().get(BUCKET_REGION_HEADER).get(0));
+ BUCKET_REGIONS.put(bucket, bucketRegion);
+
+ return bucketRegion;
+ } catch (S3Exception exception) {
+ if (exception.statusCode() == SC_301_MOVED_PERMANENTLY) {
+ Region bucketRegion = Region.of(
+
exception.awsErrorDetails().sdkHttpResponse().headers().get(BUCKET_REGION_HEADER)
+ .get(0));
+ BUCKET_REGIONS.put(bucket, bucketRegion);
+
+ return bucketRegion;
+ }
+
+ if (exception.statusCode() == SC_404_NOT_FOUND) {
+ throw new UnknownStoreException("s3a://" + bucket + "/",
+ " Bucket does " + "not exist");
Review Comment:
join the strings
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java:
##########
@@ -600,4 +601,17 @@ public static boolean isSTSSignerCalled() {
return stsSignerCalled;
}
}
+
+ private S3Client getS3Client(String reason) throws IllegalAccessException {
Review Comment:
add a javadoc to explain this
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/GetS3RegionOperation.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.HeadBucketRequest;
+import software.amazon.awssdk.services.s3.model.HeadBucketResponse;
+import software.amazon.awssdk.services.s3.model.S3Exception;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.s3a.AWSCredentialProviderList;
+import org.apache.hadoop.fs.s3a.UnknownStoreException;
+import org.apache.hadoop.fs.store.LogExactlyOnce;
+import org.apache.hadoop.util.DurationInfo;
+
+import static org.apache.hadoop.fs.s3a.Constants.BUCKET_REGION_HEADER;
+import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_BUCKET_PREFIX;
+import static org.apache.hadoop.fs.s3a.Statistic.STORE_REGION_PROBE;
+import static
org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_301_MOVED_PERMANENTLY;
+import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404_NOT_FOUND;
+import static
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration;
+
+/**
+ * Operation to get the S3 region for a bucket.
+ * If a region is not specified in {@link
org.apache.hadoop.fs.s3a.Constants#AWS_REGION}, this
+ * operation will warn and then probe S3 to get the region.
+ */
+public class GetS3RegionOperation extends ExecutingStoreOperation<Region> {
+
+ private final AWSCredentialProviderList credentials;
+
+ private final String region;
+
+ private final String bucket;
+
+ private final StoreContext context;
+
+ private final static Map<String, Region> BUCKET_REGIONS = new HashMap<>();
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ GetS3RegionOperation.class);
+
+ /** Exactly once log to warn about setting the region in config to avoid
probe. */
+ private static final LogExactlyOnce SET_REGION_WARNING = new
LogExactlyOnce(LOG);
+
+ public GetS3RegionOperation(
+ final StoreContext context,
+ String region,
+ String bucket,
+ AWSCredentialProviderList credentials) {
+ super(context);
+ this.context = context;
+ this.credentials = credentials;
+ this.region = region;
+ this.bucket = bucket;
+ }
+
+
+ @Override
+ public Region execute() throws IOException {
+
+ if (!StringUtils.isBlank(region)) {
+ return Region.of(region);
+ }
+
+ Region cachedRegion = BUCKET_REGIONS.get(bucket);
+
+ if (cachedRegion != null) {
+ LOG.debug("Got region {} for bucket {} from cache", cachedRegion,
bucket);
+ return cachedRegion;
+ }
+
+ try (DurationInfo ignore =
+ new DurationInfo(LOG, false, "Get S3 region")) {
+
+ SET_REGION_WARNING.warn(
+ "Getting region for bucket {} from S3, this will slow down FS
initialisation. "
+ + "To avoid this, set the region using property {}", bucket,
+ FS_S3A_BUCKET_PREFIX + bucket + ".endpoint.region");
+
+ // build a s3 client with region eu-west-1 that can be used to get the
region of the
+ // bucket. Using eu-west-1, as headBucket() doesn't work with us-east-1.
This is because
+ // us-east-1 uses the endpoint s3.amazonaws.com, which resolves
bucket.s3.amazonaws.com
+ // to the actual region the bucket is in. As the request is signed with
us-east-1 and
+ // not the bucket's region, it fails.
+ S3Client getRegionS3Client =
+
S3Client.builder().region(Region.EU_WEST_1).credentialsProvider(credentials)
Review Comment:
what's going to happen with govcloud or china regions?
I'd propose having a callback interface we've done with the other
operations, so this class can be given unit tests with a different
implementation.
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java:
##########
@@ -82,6 +83,38 @@ S3Client createS3Client(URI uri,
S3AsyncClient createS3AsyncClient(URI uri,
S3ClientCreationParameters parameters) throws IOException;
+ /**
+ * Creates a new {@link software.amazon.encryption.s3.S3EncryptionClient}.
+ * Used when client side encryption is enabled.
+ *
+ * @param s3AsyncClient The asynchronous S3 client, will be used for
cryptographic operations.
+ * @param s3Client The synchronous S3 client, will be used for non
cryptographic operations.
+ * @param kmsKeyring kms wrapping key to be used
+ * @return S3EncryptionClient
+ */
+ S3Client createS3EncryptionClient(S3AsyncClient s3AsyncClient, S3Client
s3Client,
Review Comment:
as mentioned, encryption options should be our own struct with an option to
say kms keyring; placeholder for client key management
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]