[ https://issues.apache.org/jira/browse/HADOOP-19399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17939208#comment-17939208 ]
ASF GitHub Bot commented on HADOOP-19399: ----------------------------------------- steveloughran commented on code in PR #7443: URL: https://github.com/apache/hadoop/pull/7443#discussion_r2018327439 ########## hadoop-project/pom.xml: ########## @@ -205,6 +205,7 @@ <surefire.fork.timeout>900</surefire.fork.timeout> <aws-java-sdk.version>1.12.720</aws-java-sdk.version> <aws-java-sdk-v2.version>2.25.53</aws-java-sdk-v2.version> + <software.amazon.awssdk.crt.version>0.29.11</software.amazon.awssdk.crt.version> Review Comment: change to `aws-crt-client.version` ########## hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java: ########## @@ -81,6 +81,7 @@ S3Client createS3Client(URI uri, S3AsyncClient createS3AsyncClient(URI uri, S3ClientCreationParameters parameters) throws IOException; + Review Comment: nit: cut ########## hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java: ########## @@ -474,6 +474,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, */ private boolean fipsEnabled; + /** + * Is CRT enabled? + */ + private boolean isCRTEnabled; Review Comment: for now, let's have that bool...config changes can follow. ########## hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ClientManagerImpl.java: ########## @@ -59,6 +60,7 @@ public class ClientManagerImpl public static final Logger LOG = LoggerFactory.getLogger(ClientManagerImpl.class); + Review Comment: cut ########## hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java: ########## @@ -175,6 +163,39 @@ public S3AsyncClient createS3AsyncClient( return s3AsyncClientBuilder.build(); } + private S3AsyncClient createS3CrtAsyncClient(URI uri, S3ClientCreationParameters parameters) Review Comment: can we pull all async client creation into its own class, so as to be confident that there's no crt client references elsewhere in the code? I don't what it to become mandatory on the classpath ########## hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSRegionEndpointResolver.java: ########## @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hadoop.fs.s3a.impl; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.awscore.util.AwsHostNameUtils; +import software.amazon.awssdk.regions.Region; + +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.s3a.S3ClientFactory; +import org.apache.hadoop.fs.store.LogExactlyOnce; + +import static org.apache.hadoop.fs.s3a.Constants.AWS_REGION; +import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_CROSS_REGION_ACCESS_ENABLED; +import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_CROSS_REGION_ACCESS_ENABLED_DEFAULT; +import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_DEFAULT_REGION; +import static org.apache.hadoop.fs.s3a.Constants.CENTRAL_ENDPOINT; +import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_SECURE_CONNECTIONS; +import static org.apache.hadoop.fs.s3a.Constants.FIPS_ENDPOINT; +import static org.apache.hadoop.fs.s3a.Constants.SECURE_CONNECTIONS; +import static org.apache.hadoop.util.Preconditions.checkArgument; + +/** + * This class uses Hadoop configurations to resolve endpoint and region information which + * is then set in the SDK clients. + */ +public class AWSRegionEndpointResolver { + + private static final String S3_SERVICE_NAME = "s3"; + + private static final Pattern VPC_ENDPOINT_PATTERN = + Pattern.compile("^(?:.+\\.)?([a-z0-9-]+)\\.vpce\\.amazonaws\\.(?:com|com\\.cn)$"); + + protected static final Logger LOG = + LoggerFactory.getLogger(AWSRegionEndpointResolver.class); + + /** + * A one-off warning of default region chains in use. + */ + private static final LogExactlyOnce WARN_OF_DEFAULT_REGION_CHAIN = + new LogExactlyOnce(LOG); + + /** + * Warning message printed when the SDK Region chain is in use. + */ + private static final String SDK_REGION_CHAIN_IN_USE = + "S3A filesystem client is using" + + " the SDK region resolution chain."; + + /** + * Error message when an endpoint is set with FIPS enabled: {@value}. + */ + @VisibleForTesting + public static final String ERROR_ENDPOINT_WITH_FIPS = + "Non central endpoint cannot be set when " + FIPS_ENDPOINT + " is true"; + + + public static AWSRegionEndpointInformation getEndpointRegionResolution(S3ClientFactory.S3ClientCreationParameters parameters, + Configuration conf) { + final String endpointStr = parameters.getEndpoint(); + final URI endpoint = getS3Endpoint(endpointStr, conf); + + AWSRegionEndpointInformation.Builder builder = new AWSRegionEndpointInformation.Builder(); + + final String configuredRegion = parameters.getRegion(); + Region region = null; + String origin = ""; + + // If the region was configured, set it. + if (configuredRegion != null && !configuredRegion.isEmpty()) { + origin = AWS_REGION; + region = Region.of(configuredRegion); + } + + // FIPs? Log it, then reject any attempt to set an endpoint + final boolean fipsEnabled = parameters.isFipsEnabled(); + if (fipsEnabled) { + LOG.debug("Enabling FIPS mode"); + } + // always setting it guarantees the value is non-null, + // which tests expect. + builder.fipsEnabled(fipsEnabled); + + if (endpoint != null) { + boolean endpointEndsWithCentral = + endpointStr.endsWith(CENTRAL_ENDPOINT); + checkArgument(!fipsEnabled || endpointEndsWithCentral, "%s : %s", + ERROR_ENDPOINT_WITH_FIPS, + endpoint); + + // No region was configured, + // determine the region from the endpoint. + if (region == null) { + region = getS3RegionFromEndpoint(endpointStr, + endpointEndsWithCentral); + if (region != null) { + origin = "endpoint"; + } + } + + // No need to override endpoint with "s3.amazonaws.com". + // Let the client take care of endpoint resolution. Overriding + // the endpoint with "s3.amazonaws.com" causes 400 Bad Request + // errors for non-existent buckets and objects. + // ref: https://github.com/aws/aws-sdk-java-v2/issues/4846 + if (!endpointEndsWithCentral) { + builder.withEndpoint(endpoint); + LOG.debug("Setting endpoint to {}", endpoint); + } else { + origin = "central endpoint with cross region access"; + LOG.debug("Enabling cross region access for endpoint {}", + endpointStr); + } + } + + if (region != null) { + builder.withRegion(region); + } else if (configuredRegion == null) { + // no region is configured, and none could be determined from the endpoint. + // Use US_EAST_2 as default. + region = Region.of(AWS_S3_DEFAULT_REGION); + builder.withRegion(region); + origin = "cross region access fallback"; + } else if (configuredRegion.isEmpty()) { + // region configuration was set to empty string. + // allow this if people really want it; it is OK to rely on this + // when deployed in EC2. + WARN_OF_DEFAULT_REGION_CHAIN.warn(SDK_REGION_CHAIN_IN_USE); + LOG.debug(SDK_REGION_CHAIN_IN_USE); + origin = "SDK region chain"; + } + boolean isCrossRegionAccessEnabled = conf.getBoolean(AWS_S3_CROSS_REGION_ACCESS_ENABLED, + AWS_S3_CROSS_REGION_ACCESS_ENABLED_DEFAULT); + // s3 cross region access + if (isCrossRegionAccessEnabled) { + builder.crossRegionAccessEnabled(true); + } + LOG.debug("Setting region to {} from {} with cross region access {}", + region, origin, isCrossRegionAccessEnabled); + + return builder.build(); + } + + + protected static URI getS3Endpoint(String endpoint, final Configuration conf) { + + boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS, DEFAULT_SECURE_CONNECTIONS); + + String protocol = secureConnections ? "https" : "http"; + + if (endpoint == null || endpoint.isEmpty()) { + // don't set an endpoint if none is configured, instead let the SDK figure it out. + return null; + } + + if (!endpoint.contains("://")) { + endpoint = String.format("%s://%s", protocol, endpoint); + } + + try { + return new URI(endpoint); Review Comment: need to think of unit tests to break this, e.g https://something/http://something-else ########## hadoop-tools/hadoop-aws/pom.xml: ########## @@ -479,6 +494,11 @@ <artifactId>bundle</artifactId> <scope>compile</scope> </dependency> + <dependency> + <groupId>software.amazon.awssdk.crt</groupId> + <artifactId>aws-crt</artifactId> + <scope>compile</scope> Review Comment: lets leave at provided, but docs to indicate this can be left out if the option is disabled ########## hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSRegionEndpointResolver.java: ########## @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hadoop.fs.s3a.impl; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.awscore.util.AwsHostNameUtils; +import software.amazon.awssdk.regions.Region; + +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.s3a.S3ClientFactory; +import org.apache.hadoop.fs.store.LogExactlyOnce; + +import static org.apache.hadoop.fs.s3a.Constants.AWS_REGION; +import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_CROSS_REGION_ACCESS_ENABLED; +import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_CROSS_REGION_ACCESS_ENABLED_DEFAULT; +import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_DEFAULT_REGION; +import static org.apache.hadoop.fs.s3a.Constants.CENTRAL_ENDPOINT; +import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_SECURE_CONNECTIONS; +import static org.apache.hadoop.fs.s3a.Constants.FIPS_ENDPOINT; +import static org.apache.hadoop.fs.s3a.Constants.SECURE_CONNECTIONS; +import static org.apache.hadoop.util.Preconditions.checkArgument; + +/** + * This class uses Hadoop configurations to resolve endpoint and region information which + * is then set in the SDK clients. + */ +public class AWSRegionEndpointResolver { + + private static final String S3_SERVICE_NAME = "s3"; + + private static final Pattern VPC_ENDPOINT_PATTERN = + Pattern.compile("^(?:.+\\.)?([a-z0-9-]+)\\.vpce\\.amazonaws\\.(?:com|com\\.cn)$"); + + protected static final Logger LOG = + LoggerFactory.getLogger(AWSRegionEndpointResolver.class); + + /** + * A one-off warning of default region chains in use. + */ + private static final LogExactlyOnce WARN_OF_DEFAULT_REGION_CHAIN = + new LogExactlyOnce(LOG); + + /** + * Warning message printed when the SDK Region chain is in use. + */ + private static final String SDK_REGION_CHAIN_IN_USE = + "S3A filesystem client is using" + + " the SDK region resolution chain."; + + /** + * Error message when an endpoint is set with FIPS enabled: {@value}. + */ + @VisibleForTesting + public static final String ERROR_ENDPOINT_WITH_FIPS = + "Non central endpoint cannot be set when " + FIPS_ENDPOINT + " is true"; + + + public static AWSRegionEndpointInformation getEndpointRegionResolution(S3ClientFactory.S3ClientCreationParameters parameters, + Configuration conf) { + final String endpointStr = parameters.getEndpoint(); + final URI endpoint = getS3Endpoint(endpointStr, conf); + + AWSRegionEndpointInformation.Builder builder = new AWSRegionEndpointInformation.Builder(); + + final String configuredRegion = parameters.getRegion(); + Region region = null; + String origin = ""; + + // If the region was configured, set it. + if (configuredRegion != null && !configuredRegion.isEmpty()) { + origin = AWS_REGION; + region = Region.of(configuredRegion); + } + + // FIPs? Log it, then reject any attempt to set an endpoint + final boolean fipsEnabled = parameters.isFipsEnabled(); + if (fipsEnabled) { + LOG.debug("Enabling FIPS mode"); + } + // always setting it guarantees the value is non-null, + // which tests expect. + builder.fipsEnabled(fipsEnabled); + + if (endpoint != null) { + boolean endpointEndsWithCentral = + endpointStr.endsWith(CENTRAL_ENDPOINT); + checkArgument(!fipsEnabled || endpointEndsWithCentral, "%s : %s", + ERROR_ENDPOINT_WITH_FIPS, + endpoint); + + // No region was configured, + // determine the region from the endpoint. + if (region == null) { + region = getS3RegionFromEndpoint(endpointStr, + endpointEndsWithCentral); + if (region != null) { + origin = "endpoint"; + } + } + + // No need to override endpoint with "s3.amazonaws.com". + // Let the client take care of endpoint resolution. Overriding + // the endpoint with "s3.amazonaws.com" causes 400 Bad Request + // errors for non-existent buckets and objects. + // ref: https://github.com/aws/aws-sdk-java-v2/issues/4846 + if (!endpointEndsWithCentral) { + builder.withEndpoint(endpoint); + LOG.debug("Setting endpoint to {}", endpoint); + } else { + origin = "central endpoint with cross region access"; + LOG.debug("Enabling cross region access for endpoint {}", + endpointStr); + } + } + + if (region != null) { + builder.withRegion(region); + } else if (configuredRegion == null) { + // no region is configured, and none could be determined from the endpoint. + // Use US_EAST_2 as default. + region = Region.of(AWS_S3_DEFAULT_REGION); + builder.withRegion(region); + origin = "cross region access fallback"; + } else if (configuredRegion.isEmpty()) { Review Comment: proposed, extra regiond * "sdk" which explicitly switches to sdk resolution * "auto" which is "us doing the right thing"; document this may change across releases ########## hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSRegionEndpointResolver.java: ########## @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hadoop.fs.s3a.impl; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.awscore.util.AwsHostNameUtils; +import software.amazon.awssdk.regions.Region; + +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.s3a.S3ClientFactory; +import org.apache.hadoop.fs.store.LogExactlyOnce; + +import static org.apache.hadoop.fs.s3a.Constants.AWS_REGION; +import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_CROSS_REGION_ACCESS_ENABLED; +import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_CROSS_REGION_ACCESS_ENABLED_DEFAULT; +import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_DEFAULT_REGION; +import static org.apache.hadoop.fs.s3a.Constants.CENTRAL_ENDPOINT; +import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_SECURE_CONNECTIONS; +import static org.apache.hadoop.fs.s3a.Constants.FIPS_ENDPOINT; +import static org.apache.hadoop.fs.s3a.Constants.SECURE_CONNECTIONS; +import static org.apache.hadoop.util.Preconditions.checkArgument; + +/** + * This class uses Hadoop configurations to resolve endpoint and region information which + * is then set in the SDK clients. + */ +public class AWSRegionEndpointResolver { + + private static final String S3_SERVICE_NAME = "s3"; + + private static final Pattern VPC_ENDPOINT_PATTERN = + Pattern.compile("^(?:.+\\.)?([a-z0-9-]+)\\.vpce\\.amazonaws\\.(?:com|com\\.cn)$"); + + protected static final Logger LOG = + LoggerFactory.getLogger(AWSRegionEndpointResolver.class); + + /** + * A one-off warning of default region chains in use. + */ + private static final LogExactlyOnce WARN_OF_DEFAULT_REGION_CHAIN = + new LogExactlyOnce(LOG); + + /** + * Warning message printed when the SDK Region chain is in use. + */ + private static final String SDK_REGION_CHAIN_IN_USE = + "S3A filesystem client is using" + + " the SDK region resolution chain."; + + /** + * Error message when an endpoint is set with FIPS enabled: {@value}. + */ + @VisibleForTesting + public static final String ERROR_ENDPOINT_WITH_FIPS = + "Non central endpoint cannot be set when " + FIPS_ENDPOINT + " is true"; + + + public static AWSRegionEndpointInformation getEndpointRegionResolution(S3ClientFactory.S3ClientCreationParameters parameters, + Configuration conf) { + final String endpointStr = parameters.getEndpoint(); + final URI endpoint = getS3Endpoint(endpointStr, conf); + + AWSRegionEndpointInformation.Builder builder = new AWSRegionEndpointInformation.Builder(); + + final String configuredRegion = parameters.getRegion(); + Region region = null; + String origin = ""; + + // If the region was configured, set it. + if (configuredRegion != null && !configuredRegion.isEmpty()) { + origin = AWS_REGION; + region = Region.of(configuredRegion); + } + + // FIPs? Log it, then reject any attempt to set an endpoint + final boolean fipsEnabled = parameters.isFipsEnabled(); + if (fipsEnabled) { + LOG.debug("Enabling FIPS mode"); + } + // always setting it guarantees the value is non-null, + // which tests expect. + builder.fipsEnabled(fipsEnabled); + + if (endpoint != null) { + boolean endpointEndsWithCentral = + endpointStr.endsWith(CENTRAL_ENDPOINT); + checkArgument(!fipsEnabled || endpointEndsWithCentral, "%s : %s", + ERROR_ENDPOINT_WITH_FIPS, + endpoint); + + // No region was configured, + // determine the region from the endpoint. + if (region == null) { + region = getS3RegionFromEndpoint(endpointStr, + endpointEndsWithCentral); + if (region != null) { + origin = "endpoint"; + } + } + + // No need to override endpoint with "s3.amazonaws.com". + // Let the client take care of endpoint resolution. Overriding + // the endpoint with "s3.amazonaws.com" causes 400 Bad Request + // errors for non-existent buckets and objects. + // ref: https://github.com/aws/aws-sdk-java-v2/issues/4846 + if (!endpointEndsWithCentral) { + builder.withEndpoint(endpoint); + LOG.debug("Setting endpoint to {}", endpoint); + } else { + origin = "central endpoint with cross region access"; + LOG.debug("Enabling cross region access for endpoint {}", + endpointStr); + } + } + + if (region != null) { + builder.withRegion(region); + } else if (configuredRegion == null) { + // no region is configured, and none could be determined from the endpoint. + // Use US_EAST_2 as default. + region = Region.of(AWS_S3_DEFAULT_REGION); + builder.withRegion(region); + origin = "cross region access fallback"; + } else if (configuredRegion.isEmpty()) { + // region configuration was set to empty string. + // allow this if people really want it; it is OK to rely on this + // when deployed in EC2. + WARN_OF_DEFAULT_REGION_CHAIN.warn(SDK_REGION_CHAIN_IN_USE); + LOG.debug(SDK_REGION_CHAIN_IN_USE); + origin = "SDK region chain"; + } + boolean isCrossRegionAccessEnabled = conf.getBoolean(AWS_S3_CROSS_REGION_ACCESS_ENABLED, + AWS_S3_CROSS_REGION_ACCESS_ENABLED_DEFAULT); + // s3 cross region access + if (isCrossRegionAccessEnabled) { + builder.crossRegionAccessEnabled(true); + } + LOG.debug("Setting region to {} from {} with cross region access {}", + region, origin, isCrossRegionAccessEnabled); + + return builder.build(); + } + + + protected static URI getS3Endpoint(String endpoint, final Configuration conf) { Review Comment: javadoc; highlight failure condition. > S3A: Add support for CRT client > ------------------------------- > > Key: HADOOP-19399 > URL: https://issues.apache.org/jira/browse/HADOOP-19399 > Project: Hadoop Common > Issue Type: Sub-task > Components: fs/s3 > Affects Versions: 3.4.1 > Reporter: Ahmar Suhail > Priority: Major > Labels: pull-request-available > Fix For: 3.4.2 > > > * Allow ClientManager to initialise a CRT client > * Should be optional, java async client to still be supported as defaultĀ -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org