This is an automated email from the ASF dual-hosted git repository.
dweeks pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 09c737656d AWS: Add configuration and set defaults for S3 retry
behaviour (#11052)
09c737656d is described below
commit 09c737656d316ab6172e0d5ee6920869237e6fd1
Author: Ozan Okumusoglu <[email protected]>
AuthorDate: Tue Oct 1 15:28:52 2024 -0700
AWS: Add configuration and set defaults for S3 retry behaviour (#11052)
Co-authored-by: Drew Schleit <[email protected]>
---
.../iceberg/aws/AssumeRoleAwsClientFactory.java | 1 +
.../org/apache/iceberg/aws/AwsClientFactories.java | 1 +
.../LakeFormationAwsClientFactory.java | 1 +
.../aws/s3/DefaultS3FileIOAwsClientFactory.java | 1 +
.../apache/iceberg/aws/s3/S3FileIOProperties.java | 124 +++++++++++++++++++++
.../iceberg/aws/s3/TestS3FileIOProperties.java | 15 +++
docs/docs/aws.md | 14 +++
7 files changed, 157 insertions(+)
diff --git
a/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java
b/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java
index d9ea511f9b..4c3c305d4b 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java
@@ -47,6 +47,7 @@ public class AssumeRoleAwsClientFactory implements
AwsClientFactory {
.applyMutation(s3FileIOProperties::applyEndpointConfigurations)
.applyMutation(s3FileIOProperties::applyServiceConfigurations)
.applyMutation(s3FileIOProperties::applySignerConfiguration)
+ .applyMutation(s3FileIOProperties::applyRetryConfigurations)
.build();
}
diff --git a/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactories.java
b/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactories.java
index 81c7bd6b4b..5974e21209 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactories.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactories.java
@@ -114,6 +114,7 @@ public class AwsClientFactories {
.applyMutation(s3FileIOProperties::applySignerConfiguration)
.applyMutation(s3FileIOProperties::applyS3AccessGrantsConfigurations)
.applyMutation(s3FileIOProperties::applyUserAgentConfigurations)
+ .applyMutation(s3FileIOProperties::applyRetryConfigurations)
.build();
}
diff --git
a/aws/src/main/java/org/apache/iceberg/aws/lakeformation/LakeFormationAwsClientFactory.java
b/aws/src/main/java/org/apache/iceberg/aws/lakeformation/LakeFormationAwsClientFactory.java
index 552da4bc94..5d37470066 100644
---
a/aws/src/main/java/org/apache/iceberg/aws/lakeformation/LakeFormationAwsClientFactory.java
+++
b/aws/src/main/java/org/apache/iceberg/aws/lakeformation/LakeFormationAwsClientFactory.java
@@ -81,6 +81,7 @@ public class LakeFormationAwsClientFactory extends
AssumeRoleAwsClientFactory {
.applyMutation(httpClientProperties()::applyHttpClientConfigurations)
.applyMutation(s3FileIOProperties()::applyEndpointConfigurations)
.applyMutation(s3FileIOProperties()::applyServiceConfigurations)
+ .applyMutation(s3FileIOProperties()::applyRetryConfigurations)
.credentialsProvider(
new LakeFormationCredentialsProvider(lakeFormation(),
buildTableArn()))
.region(Region.of(region()))
diff --git
a/aws/src/main/java/org/apache/iceberg/aws/s3/DefaultS3FileIOAwsClientFactory.java
b/aws/src/main/java/org/apache/iceberg/aws/s3/DefaultS3FileIOAwsClientFactory.java
index 18b40000a9..8687d737a5 100644
---
a/aws/src/main/java/org/apache/iceberg/aws/s3/DefaultS3FileIOAwsClientFactory.java
+++
b/aws/src/main/java/org/apache/iceberg/aws/s3/DefaultS3FileIOAwsClientFactory.java
@@ -55,6 +55,7 @@ class DefaultS3FileIOAwsClientFactory implements
S3FileIOAwsClientFactory {
.applyMutation(s3FileIOProperties::applySignerConfiguration)
.applyMutation(s3FileIOProperties::applyS3AccessGrantsConfigurations)
.applyMutation(s3FileIOProperties::applyUserAgentConfigurations)
+ .applyMutation(s3FileIOProperties::applyRetryConfigurations)
.build();
}
}
diff --git
a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java
b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java
index 343540d87f..6813913a4d 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java
@@ -20,10 +20,12 @@ package org.apache.iceberg.aws.s3;
import java.io.Serializable;
import java.net.URI;
+import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import javax.xml.stream.XMLStreamException;
import org.apache.iceberg.EnvironmentContext;
import org.apache.iceberg.aws.AwsClientProperties;
import org.apache.iceberg.aws.glue.GlueCatalog;
@@ -38,6 +40,14 @@ import org.apache.iceberg.util.SerializableMap;
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
+import software.amazon.awssdk.core.exception.SdkServiceException;
+import software.amazon.awssdk.core.retry.RetryMode;
+import software.amazon.awssdk.core.retry.RetryPolicy;
+import software.amazon.awssdk.core.retry.backoff.EqualJitterBackoffStrategy;
+import software.amazon.awssdk.core.retry.conditions.OrRetryCondition;
+import software.amazon.awssdk.core.retry.conditions.RetryCondition;
+import software.amazon.awssdk.core.retry.conditions.RetryOnExceptionsCondition;
+import software.amazon.awssdk.core.retry.conditions.TokenBucketRetryCondition;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
import software.amazon.awssdk.services.s3.S3Configuration;
import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
@@ -393,6 +403,21 @@ public class S3FileIOProperties implements Serializable {
*/
private static final String S3_FILE_IO_USER_AGENT = "s3fileio/" +
EnvironmentContext.get();
+ /** Number of times to retry S3 operations. */
+ public static final String S3_RETRY_NUM_RETRIES = "s3.retry.num-retries";
+
+ public static final int S3_RETRY_NUM_RETRIES_DEFAULT = 5;
+
+ /** Minimum wait time to retry a S3 operation */
+ public static final String S3_RETRY_MIN_WAIT_MS = "s3.retry.min-wait-ms";
+
+ public static final long S3_RETRY_MIN_WAIT_MS_DEFAULT = 2_000; // 2 seconds
+
+ /** Maximum wait time to retry a S3 read operation */
+ public static final String S3_RETRY_MAX_WAIT_MS = "s3.retry.max-wait-ms";
+
+ public static final long S3_RETRY_MAX_WAIT_MS_DEFAULT = 20_000; // 20 seconds
+
private String sseType;
private String sseKey;
private String sseMd5;
@@ -423,6 +448,9 @@ public class S3FileIOProperties implements Serializable {
private final String endpoint;
private final boolean isRemoteSigningEnabled;
private String writeStorageClass;
+ private int s3RetryNumRetries;
+ private long s3RetryMinWaitMs;
+ private long s3RetryMaxWaitMs;
private final Map<String, String> allProperties;
public S3FileIOProperties() {
@@ -455,6 +483,9 @@ public class S3FileIOProperties implements Serializable {
this.isRemoteSigningEnabled = REMOTE_SIGNING_ENABLED_DEFAULT;
this.isS3AccessGrantsEnabled = S3_ACCESS_GRANTS_ENABLED_DEFAULT;
this.isS3AccessGrantsFallbackToIamEnabled =
S3_ACCESS_GRANTS_FALLBACK_TO_IAM_ENABLED_DEFAULT;
+ this.s3RetryNumRetries = S3_RETRY_NUM_RETRIES_DEFAULT;
+ this.s3RetryMinWaitMs = S3_RETRY_MIN_WAIT_MS_DEFAULT;
+ this.s3RetryMaxWaitMs = S3_RETRY_MAX_WAIT_MS_DEFAULT;
this.allProperties = Maps.newHashMap();
ValidationException.check(
@@ -553,6 +584,12 @@ public class S3FileIOProperties implements Serializable {
properties,
S3_ACCESS_GRANTS_FALLBACK_TO_IAM_ENABLED,
S3_ACCESS_GRANTS_FALLBACK_TO_IAM_ENABLED_DEFAULT);
+ this.s3RetryNumRetries =
+ PropertyUtil.propertyAsInt(properties, S3_RETRY_NUM_RETRIES,
S3_RETRY_NUM_RETRIES_DEFAULT);
+ this.s3RetryMinWaitMs =
+ PropertyUtil.propertyAsLong(properties, S3_RETRY_MIN_WAIT_MS,
S3_RETRY_MIN_WAIT_MS_DEFAULT);
+ this.s3RetryMaxWaitMs =
+ PropertyUtil.propertyAsLong(properties, S3_RETRY_MAX_WAIT_MS,
S3_RETRY_MAX_WAIT_MS_DEFAULT);
ValidationException.check(
keyIdAccessKeyBothConfigured(),
@@ -753,6 +790,34 @@ public class S3FileIOProperties implements Serializable {
this.isS3AccessGrantsFallbackToIamEnabled =
s3AccessGrantsFallbackToIamEnabled;
}
+ public int s3RetryNumRetries() {
+ return s3RetryNumRetries;
+ }
+
+ public void setS3RetryNumRetries(int s3RetryNumRetries) {
+ this.s3RetryNumRetries = s3RetryNumRetries;
+ }
+
+ public long s3RetryMinWaitMs() {
+ return s3RetryMinWaitMs;
+ }
+
+ public void setS3RetryMinWaitMs(long s3RetryMinWaitMs) {
+ this.s3RetryMinWaitMs = s3RetryMinWaitMs;
+ }
+
+ public long s3RetryMaxWaitMs() {
+ return s3RetryMaxWaitMs;
+ }
+
+ public void setS3RetryMaxWaitMs(long s3RetryMaxWaitMs) {
+ this.s3RetryMaxWaitMs = s3RetryMaxWaitMs;
+ }
+
+ public long s3RetryTotalWaitMs() {
+ return (long) s3RetryNumRetries() * s3RetryMaxWaitMs();
+ }
+
private boolean keyIdAccessKeyBothConfigured() {
return (accessKeyId == null) == (secretAccessKey == null);
}
@@ -824,6 +889,65 @@ public class S3FileIOProperties implements Serializable {
}
}
+ /**
+ * Override the retry configurations for an S3 client.
+ *
+ * <p>Sample usage:
+ *
+ * <pre>
+ *
S3Client.builder().applyMutation(s3FileIOProperties::applyRetryConfigurations)
+ * </pre>
+ */
+ public <T extends S3ClientBuilder> void applyRetryConfigurations(T builder) {
+ builder.overrideConfiguration(
+ config ->
+ config.retryPolicy(
+ // Use a retry strategy which will persistently retry
throttled exceptions with
+ // exponential backoff, to give S3 a chance to autoscale.
+ // LEGACY mode works best here, as it will allow throttled
exceptions to use all of
+ // the configured retry attempts.
+ RetryPolicy.builder(RetryMode.LEGACY)
+ .numRetries(s3RetryNumRetries)
+ .throttlingBackoffStrategy(
+ EqualJitterBackoffStrategy.builder()
+ .baseDelay(Duration.ofMillis(s3RetryMinWaitMs))
+
.maxBackoffTime(Duration.ofMillis(s3RetryMaxWaitMs))
+ .build())
+
+ // Workaround: add XMLStreamException as a retryable
exception.
+ // https://github.com/aws/aws-sdk-java-v2/issues/5442
+ // Without this workaround, we see SDK failures if there's
a socket exception
+ // while parsing an error XML response.
+ .retryCondition(
+ OrRetryCondition.create(
+ RetryCondition.defaultRetryCondition(),
+
RetryOnExceptionsCondition.create(XMLStreamException.class)))
+
+ // Workaround: exclude all 503s from consuming retry
tokens.
+ // https://github.com/aws/aws-sdk-java-v2/issues/5414
+ // Without this workaround, workloads which see 503s from
S3 HEAD will fail
+ // prematurely.
+ .retryCapacityCondition(
+ TokenBucketRetryCondition.builder()
+ .tokenBucketSize(500) // 500 is the SDK default
+ .exceptionCostFunction(
+ e -> {
+ if (e instanceof SdkServiceException) {
+ SdkServiceException sdkServiceException =
+ (SdkServiceException) e;
+ if
(sdkServiceException.isThrottlingException()
+ || sdkServiceException.statusCode() ==
503) {
+ return 0;
+ }
+ }
+
+ // 5 is the SDK default for non-throttling
exceptions
+ return 5;
+ })
+ .build())
+ .build()));
+ }
+
/**
* Add the S3 Access Grants Plugin for an S3 client.
*
diff --git
a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIOProperties.java
b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIOProperties.java
index f445a2d224..a61b9efb9f 100644
--- a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIOProperties.java
+++ b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIOProperties.java
@@ -36,6 +36,8 @@ import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
+import software.amazon.awssdk.core.retry.RetryPolicy;
+import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
import software.amazon.awssdk.services.s3.S3Configuration;
import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
@@ -491,4 +493,17 @@ public class TestS3FileIOProperties {
Mockito.verify(mockS3ClientBuilder)
.overrideConfiguration(Mockito.any(ClientOverrideConfiguration.class));
}
+
+ @Test
+ public void testApplyRetryConfiguration() {
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(S3FileIOProperties.S3_RETRY_NUM_RETRIES, "999");
+ S3FileIOProperties s3FileIOProperties = new S3FileIOProperties(properties);
+
+ S3ClientBuilder builder = S3Client.builder();
+ s3FileIOProperties.applyRetryConfigurations(builder);
+
+ RetryPolicy retryPolicy =
builder.overrideConfiguration().retryPolicy().get();
+ assertThat(retryPolicy.numRetries()).as("retries was not
set").isEqualTo(999);
+ }
}
diff --git a/docs/docs/aws.md b/docs/docs/aws.md
index b9638bb50d..5a166c0c91 100644
--- a/docs/docs/aws.md
+++ b/docs/docs/aws.md
@@ -378,6 +378,20 @@ However, for the older versions up to 0.12.0, the logic is
as follows:
For more details, please refer to the [LocationProvider
Configuration](custom-catalog.md#custom-location-provider-implementation)
section.
+### S3 Retries
+
+Workloads which encounter S3 throttling should persistently retry, with
exponential backoff, to make progress while S3
+automatically scales. We provide the configurations below to adjust S3 retries
for this purpose. For workloads that encounter
+throttling and fail due to retry exhaustion, we recommend retry count to set
32 in order allow S3 to auto-scale. Note that
+workloads with exceptionally high throughput against tables that S3 has not
yet scaled, it may be necessary to increase the retry count further.
+
+
+| Property | Default | Description
|
+|----------------------|---------|---------------------------------------------------------------------------------------|
+| s3.retry.num-retries | 5 | Number of times to retry S3 operations.
Recommended 32 for high-throughput workloads. |
+| s3.retry.min-wait-ms | 2s | Minimum wait time to retry a S3 operation.
|
+| s3.retry.max-wait-ms | 20s | Maximum wait time to retry a S3 read
operation. |
+
### S3 Strong Consistency
In November 2020, S3 announced [strong
consistency](https://aws.amazon.com/s3/consistency/) for all read operations,
and Iceberg is updated to fully leverage this feature.