[
https://issues.apache.org/jira/browse/HADOOP-18708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17744989#comment-17744989
]
ASF GitHub Bot commented on HADOOP-18708:
-----------------------------------------
steveloughran commented on code in PR #5767:
URL: https://github.com/apache/hadoop/pull/5767#discussion_r1269171373
##########
hadoop-project/pom.xml:
##########
@@ -1145,6 +1146,17 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>software.amazon.encryption.s3</groupId>
Review Comment:
does this come in bundle.jar?
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java:
##########
@@ -98,10 +97,10 @@
import software.amazon.awssdk.transfer.s3.model.FileUpload;
import software.amazon.awssdk.transfer.s3.model.UploadFileRequest;
-import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import software.amazon.encryption.s3.materials.KmsKeyring;
Review Comment:
should be with reset of imports
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/GetS3RegionOperation.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.HeadBucketRequest;
+import software.amazon.awssdk.services.s3.model.HeadBucketResponse;
+import software.amazon.awssdk.services.s3.model.S3Exception;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.s3a.AWSCredentialProviderList;
+import org.apache.hadoop.fs.s3a.UnknownStoreException;
+import org.apache.hadoop.fs.store.LogExactlyOnce;
+import org.apache.hadoop.util.DurationInfo;
+
+import static org.apache.hadoop.fs.s3a.Constants.BUCKET_REGION_HEADER;
+import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_BUCKET_PREFIX;
+import static org.apache.hadoop.fs.s3a.Statistic.STORE_REGION_PROBE;
+import static
org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_301_MOVED_PERMANENTLY;
+import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404_NOT_FOUND;
+import static
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration;
+
+/**
+ * Operation to get the S3 region for a bucket.
+ * If a region is not specified in {@link
org.apache.hadoop.fs.s3a.Constants#AWS_REGION}, this
+ * operation will warn and then probe S3 to get the region.
+ */
+public class GetS3RegionOperation extends ExecutingStoreOperation<Region> {
+
+ private final AWSCredentialProviderList credentials;
+
+ private final String region;
+
+ private final String bucket;
+
+ private final StoreContext context;
+
+ private final static Map<String, Region> BUCKET_REGIONS = new HashMap<>();
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ GetS3RegionOperation.class);
+
+ /** Exactly once log to warn about setting the region in config to avoid
probe. */
+ private static final LogExactlyOnce SET_REGION_WARNING = new
LogExactlyOnce(LOG);
+
+ public GetS3RegionOperation(
+ final StoreContext context,
+ String region,
+ String bucket,
+ AWSCredentialProviderList credentials) {
+ super(context);
+ this.context = context;
+ this.credentials = credentials;
+ this.region = region;
+ this.bucket = bucket;
+ }
+
+
+ @Override
+ public Region execute() throws IOException {
+
+ if (!StringUtils.isBlank(region)) {
+ return Region.of(region);
+ }
+
+ Region cachedRegion = BUCKET_REGIONS.get(bucket);
+
+ if (cachedRegion != null) {
+ LOG.debug("Got region {} for bucket {} from cache", cachedRegion,
bucket);
+ return cachedRegion;
+ }
+
+ try (DurationInfo ignore =
+ new DurationInfo(LOG, false, "Get S3 region")) {
Review Comment:
include bucket in log message
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/GetS3RegionOperation.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.HeadBucketRequest;
+import software.amazon.awssdk.services.s3.model.HeadBucketResponse;
+import software.amazon.awssdk.services.s3.model.S3Exception;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.s3a.AWSCredentialProviderList;
+import org.apache.hadoop.fs.s3a.UnknownStoreException;
+import org.apache.hadoop.fs.store.LogExactlyOnce;
+import org.apache.hadoop.util.DurationInfo;
+
+import static org.apache.hadoop.fs.s3a.Constants.BUCKET_REGION_HEADER;
+import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_BUCKET_PREFIX;
+import static org.apache.hadoop.fs.s3a.Statistic.STORE_REGION_PROBE;
+import static
org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_301_MOVED_PERMANENTLY;
+import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404_NOT_FOUND;
+import static
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration;
+
+/**
+ * Operation to get the S3 region for a bucket.
+ * If a region is not specified in {@link
org.apache.hadoop.fs.s3a.Constants#AWS_REGION}, this
+ * operation will warn and then probe S3 to get the region.
+ */
+public class GetS3RegionOperation extends ExecutingStoreOperation<Region> {
+
+ private final AWSCredentialProviderList credentials;
+
+ private final String region;
+
+ private final String bucket;
+
+ private final StoreContext context;
+
+ private final static Map<String, Region> BUCKET_REGIONS = new HashMap<>();
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ GetS3RegionOperation.class);
+
+ /** Exactly once log to warn about setting the region in config to avoid
probe. */
+ private static final LogExactlyOnce SET_REGION_WARNING = new
LogExactlyOnce(LOG);
+
+ public GetS3RegionOperation(
+ final StoreContext context,
+ String region,
+ String bucket,
+ AWSCredentialProviderList credentials) {
+ super(context);
+ this.context = context;
+ this.credentials = credentials;
+ this.region = region;
+ this.bucket = bucket;
+ }
+
+
+ @Override
+ public Region execute() throws IOException {
+
+ if (!StringUtils.isBlank(region)) {
+ return Region.of(region);
+ }
+
+ Region cachedRegion = BUCKET_REGIONS.get(bucket);
+
+ if (cachedRegion != null) {
+ LOG.debug("Got region {} for bucket {} from cache", cachedRegion,
bucket);
+ return cachedRegion;
+ }
+
+ try (DurationInfo ignore =
+ new DurationInfo(LOG, false, "Get S3 region")) {
+
+ SET_REGION_WARNING.warn(
Review Comment:
move above the try() just to tune log ordering
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java:
##########
@@ -358,7 +359,7 @@ public void
shouldBeAbleToSwitchOnS3PathStyleAccessViaConfigProperty()
try {
fs = S3ATestUtils.createTestFileSystem(conf);
assertNotNull(fs);
- S3Client s3 = fs.getAmazonS3ClientForTesting("configuration");
+ S3Client s3 = getS3Client("configuration");
Review Comment:
what about just force disabling CSE by removing base/bucket overrides and
setting to false?
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java:
##########
@@ -91,6 +95,43 @@ public S3Client createS3Client(
.build();
}
+ @Override
+ public S3Client createS3EncryptionClient(final S3AsyncClient s3AsyncClient,
+ final S3Client s3Client, final KmsKeyring keyring) {
+
+ return S3EncryptionClient.builder()
+ .wrappedAsyncClient(s3AsyncClient)
+ .wrappedClient(s3Client)
+ .enableLegacyUnauthenticatedModes(true)
+ .keyring(keyring)
Review Comment:
we've had some interest in supporting customer supplied keys too...this
would be the time to add it -or at least leave the points for it open. in
particular, that third argument should be a structure which contains the
keyring and some enum of option to use
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/GetS3RegionOperation.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.HeadBucketRequest;
+import software.amazon.awssdk.services.s3.model.HeadBucketResponse;
+import software.amazon.awssdk.services.s3.model.S3Exception;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.s3a.AWSCredentialProviderList;
+import org.apache.hadoop.fs.s3a.UnknownStoreException;
+import org.apache.hadoop.fs.store.LogExactlyOnce;
+import org.apache.hadoop.util.DurationInfo;
+
+import static org.apache.hadoop.fs.s3a.Constants.BUCKET_REGION_HEADER;
+import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_BUCKET_PREFIX;
+import static org.apache.hadoop.fs.s3a.Statistic.STORE_REGION_PROBE;
+import static
org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_301_MOVED_PERMANENTLY;
+import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404_NOT_FOUND;
+import static
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration;
+
+/**
+ * Operation to get the S3 region for a bucket.
+ * If a region is not specified in {@link
org.apache.hadoop.fs.s3a.Constants#AWS_REGION}, this
+ * operation will warn and then probe S3 to get the region.
+ */
+public class GetS3RegionOperation extends ExecutingStoreOperation<Region> {
+
+ private final AWSCredentialProviderList credentials;
+
+ private final String region;
+
+ private final String bucket;
+
+ private final StoreContext context;
+
+ private final static Map<String, Region> BUCKET_REGIONS = new HashMap<>();
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ GetS3RegionOperation.class);
+
+ /** Exactly once log to warn about setting the region in config to avoid
probe. */
+ private static final LogExactlyOnce SET_REGION_WARNING = new
LogExactlyOnce(LOG);
+
+ public GetS3RegionOperation(
+ final StoreContext context,
+ String region,
+ String bucket,
+ AWSCredentialProviderList credentials) {
+ super(context);
+ this.context = context;
+ this.credentials = credentials;
+ this.region = region;
+ this.bucket = bucket;
+ }
+
+
+ @Override
+ public Region execute() throws IOException {
+
+ if (!StringUtils.isBlank(region)) {
+ return Region.of(region);
+ }
+
+ Region cachedRegion = BUCKET_REGIONS.get(bucket);
+
+ if (cachedRegion != null) {
+ LOG.debug("Got region {} for bucket {} from cache", cachedRegion,
bucket);
+ return cachedRegion;
+ }
+
+ try (DurationInfo ignore =
+ new DurationInfo(LOG, false, "Get S3 region")) {
+
+ SET_REGION_WARNING.warn(
+ "Getting region for bucket {} from S3, this will slow down FS
initialisation. "
+ + "To avoid this, set the region using property {}", bucket,
+ FS_S3A_BUCKET_PREFIX + bucket + ".endpoint.region");
+
+ // build a s3 client with region eu-west-1 that can be used to get the
region of the
+ // bucket. Using eu-west-1, as headBucket() doesn't work with us-east-1.
This is because
+ // us-east-1 uses the endpoint s3.amazonaws.com, which resolves
bucket.s3.amazonaws.com
+ // to the actual region the bucket is in. As the request is signed with
us-east-1 and
+ // not the bucket's region, it fails.
+ S3Client getRegionS3Client =
+
S3Client.builder().region(Region.EU_WEST_1).credentialsProvider(credentials)
+ .build();
+
+ HeadBucketResponse headBucketResponse =
trackDuration(context.getInstrumentation(),
+ STORE_REGION_PROBE.getSymbol(),
+ () ->
getRegionS3Client.headBucket(HeadBucketRequest.builder().bucket(bucket).build()));
+
+ Region bucketRegion = Region.of(
+
headBucketResponse.sdkHttpResponse().headers().get(BUCKET_REGION_HEADER).get(0));
+ BUCKET_REGIONS.put(bucket, bucketRegion);
+
+ return bucketRegion;
+ } catch (S3Exception exception) {
+ if (exception.statusCode() == SC_301_MOVED_PERMANENTLY) {
+ Region bucketRegion = Region.of(
+
exception.awsErrorDetails().sdkHttpResponse().headers().get(BUCKET_REGION_HEADER)
+ .get(0));
+ BUCKET_REGIONS.put(bucket, bucketRegion);
+
+ return bucketRegion;
+ }
+
+ if (exception.statusCode() == SC_404_NOT_FOUND) {
+ throw new UnknownStoreException("s3a://" + bucket + "/",
+ " Bucket does " + "not exist");
Review Comment:
join the strings
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java:
##########
@@ -600,4 +601,17 @@ public static boolean isSTSSignerCalled() {
return stsSignerCalled;
}
}
+
+ private S3Client getS3Client(String reason) throws IllegalAccessException {
Review Comment:
add a javadoc to explain this
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/GetS3RegionOperation.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.HeadBucketRequest;
+import software.amazon.awssdk.services.s3.model.HeadBucketResponse;
+import software.amazon.awssdk.services.s3.model.S3Exception;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.s3a.AWSCredentialProviderList;
+import org.apache.hadoop.fs.s3a.UnknownStoreException;
+import org.apache.hadoop.fs.store.LogExactlyOnce;
+import org.apache.hadoop.util.DurationInfo;
+
+import static org.apache.hadoop.fs.s3a.Constants.BUCKET_REGION_HEADER;
+import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_BUCKET_PREFIX;
+import static org.apache.hadoop.fs.s3a.Statistic.STORE_REGION_PROBE;
+import static
org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_301_MOVED_PERMANENTLY;
+import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404_NOT_FOUND;
+import static
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration;
+
+/**
+ * Operation to get the S3 region for a bucket.
+ * If a region is not specified in {@link
org.apache.hadoop.fs.s3a.Constants#AWS_REGION}, this
+ * operation will warn and then probe S3 to get the region.
+ */
+public class GetS3RegionOperation extends ExecutingStoreOperation<Region> {
+
+ private final AWSCredentialProviderList credentials;
+
+ private final String region;
+
+ private final String bucket;
+
+ private final StoreContext context;
+
+ private final static Map<String, Region> BUCKET_REGIONS = new HashMap<>();
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ GetS3RegionOperation.class);
+
+ /** Exactly once log to warn about setting the region in config to avoid
probe. */
+ private static final LogExactlyOnce SET_REGION_WARNING = new
LogExactlyOnce(LOG);
+
+ public GetS3RegionOperation(
+ final StoreContext context,
+ String region,
+ String bucket,
+ AWSCredentialProviderList credentials) {
+ super(context);
+ this.context = context;
+ this.credentials = credentials;
+ this.region = region;
+ this.bucket = bucket;
+ }
+
+
+ @Override
+ public Region execute() throws IOException {
+
+ if (!StringUtils.isBlank(region)) {
+ return Region.of(region);
+ }
+
+ Region cachedRegion = BUCKET_REGIONS.get(bucket);
+
+ if (cachedRegion != null) {
+ LOG.debug("Got region {} for bucket {} from cache", cachedRegion,
bucket);
+ return cachedRegion;
+ }
+
+ try (DurationInfo ignore =
+ new DurationInfo(LOG, false, "Get S3 region")) {
+
+ SET_REGION_WARNING.warn(
+ "Getting region for bucket {} from S3, this will slow down FS
initialisation. "
+ + "To avoid this, set the region using property {}", bucket,
+ FS_S3A_BUCKET_PREFIX + bucket + ".endpoint.region");
+
+ // build a s3 client with region eu-west-1 that can be used to get the
region of the
+ // bucket. Using eu-west-1, as headBucket() doesn't work with us-east-1.
This is because
+ // us-east-1 uses the endpoint s3.amazonaws.com, which resolves
bucket.s3.amazonaws.com
+ // to the actual region the bucket is in. As the request is signed with
us-east-1 and
+ // not the bucket's region, it fails.
+ S3Client getRegionS3Client =
+
S3Client.builder().region(Region.EU_WEST_1).credentialsProvider(credentials)
Review Comment:
what's going to happen with govcloud or china regions?
I'd propose having a callback interface we've done with the other
operations, so this class can be given unit tests with a different
implementation.
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java:
##########
@@ -82,6 +83,38 @@ S3Client createS3Client(URI uri,
S3AsyncClient createS3AsyncClient(URI uri,
S3ClientCreationParameters parameters) throws IOException;
+ /**
+ * Creates a new {@link software.amazon.encryption.s3.S3EncryptionClient}.
+ * Used when client side encryption is enabled.
+ *
+ * @param s3AsyncClient The asynchronous S3 client, will be used for
cryptographic operations.
+ * @param s3Client The synchronous S3 client, will be used for non
cryptographic operations.
+ * @param kmsKeyring kms wrapping key to be used
+ * @return S3EncryptionClient
+ */
+ S3Client createS3EncryptionClient(S3AsyncClient s3AsyncClient, S3Client
s3Client,
Review Comment:
as mentioned, encryption options should be our own struct with an option to
say kms keyring; placeholder for client key management
> AWS SDK V2 - Implement CSE
> --------------------------
>
> Key: HADOOP-18708
> URL: https://issues.apache.org/jira/browse/HADOOP-18708
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/s3
> Affects Versions: 3.4.0
> Reporter: Ahmar Suhail
> Priority: Major
> Labels: pull-request-available
>
> S3 Encryption client for SDK V2 is now available, so add client side
> encryption back in.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]