This is an automated email from the ASF dual-hosted git repository.

dweeks pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 09c737656d AWS: Add configuration and set defaults for S3 retry 
behaviour (#11052)
09c737656d is described below

commit 09c737656d316ab6172e0d5ee6920869237e6fd1
Author: Ozan Okumusoglu <[email protected]>
AuthorDate: Tue Oct 1 15:28:52 2024 -0700

    AWS: Add configuration and set defaults for S3 retry behaviour (#11052)
    
    Co-authored-by: Drew Schleit <[email protected]>
---
 .../iceberg/aws/AssumeRoleAwsClientFactory.java    |   1 +
 .../org/apache/iceberg/aws/AwsClientFactories.java |   1 +
 .../LakeFormationAwsClientFactory.java             |   1 +
 .../aws/s3/DefaultS3FileIOAwsClientFactory.java    |   1 +
 .../apache/iceberg/aws/s3/S3FileIOProperties.java  | 124 +++++++++++++++++++++
 .../iceberg/aws/s3/TestS3FileIOProperties.java     |  15 +++
 docs/docs/aws.md                                   |  14 +++
 7 files changed, 157 insertions(+)

diff --git 
a/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java 
b/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java
index d9ea511f9b..4c3c305d4b 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/AssumeRoleAwsClientFactory.java
@@ -47,6 +47,7 @@ public class AssumeRoleAwsClientFactory implements 
AwsClientFactory {
         .applyMutation(s3FileIOProperties::applyEndpointConfigurations)
         .applyMutation(s3FileIOProperties::applyServiceConfigurations)
         .applyMutation(s3FileIOProperties::applySignerConfiguration)
+        .applyMutation(s3FileIOProperties::applyRetryConfigurations)
         .build();
   }
 
diff --git a/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactories.java 
b/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactories.java
index 81c7bd6b4b..5974e21209 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactories.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/AwsClientFactories.java
@@ -114,6 +114,7 @@ public class AwsClientFactories {
           .applyMutation(s3FileIOProperties::applySignerConfiguration)
           .applyMutation(s3FileIOProperties::applyS3AccessGrantsConfigurations)
           .applyMutation(s3FileIOProperties::applyUserAgentConfigurations)
+          .applyMutation(s3FileIOProperties::applyRetryConfigurations)
           .build();
     }
 
diff --git 
a/aws/src/main/java/org/apache/iceberg/aws/lakeformation/LakeFormationAwsClientFactory.java
 
b/aws/src/main/java/org/apache/iceberg/aws/lakeformation/LakeFormationAwsClientFactory.java
index 552da4bc94..5d37470066 100644
--- 
a/aws/src/main/java/org/apache/iceberg/aws/lakeformation/LakeFormationAwsClientFactory.java
+++ 
b/aws/src/main/java/org/apache/iceberg/aws/lakeformation/LakeFormationAwsClientFactory.java
@@ -81,6 +81,7 @@ public class LakeFormationAwsClientFactory extends 
AssumeRoleAwsClientFactory {
           .applyMutation(httpClientProperties()::applyHttpClientConfigurations)
           .applyMutation(s3FileIOProperties()::applyEndpointConfigurations)
           .applyMutation(s3FileIOProperties()::applyServiceConfigurations)
+          .applyMutation(s3FileIOProperties()::applyRetryConfigurations)
           .credentialsProvider(
               new LakeFormationCredentialsProvider(lakeFormation(), 
buildTableArn()))
           .region(Region.of(region()))
diff --git 
a/aws/src/main/java/org/apache/iceberg/aws/s3/DefaultS3FileIOAwsClientFactory.java
 
b/aws/src/main/java/org/apache/iceberg/aws/s3/DefaultS3FileIOAwsClientFactory.java
index 18b40000a9..8687d737a5 100644
--- 
a/aws/src/main/java/org/apache/iceberg/aws/s3/DefaultS3FileIOAwsClientFactory.java
+++ 
b/aws/src/main/java/org/apache/iceberg/aws/s3/DefaultS3FileIOAwsClientFactory.java
@@ -55,6 +55,7 @@ class DefaultS3FileIOAwsClientFactory implements 
S3FileIOAwsClientFactory {
         .applyMutation(s3FileIOProperties::applySignerConfiguration)
         .applyMutation(s3FileIOProperties::applyS3AccessGrantsConfigurations)
         .applyMutation(s3FileIOProperties::applyUserAgentConfigurations)
+        .applyMutation(s3FileIOProperties::applyRetryConfigurations)
         .build();
   }
 }
diff --git 
a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java 
b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java
index 343540d87f..6813913a4d 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java
@@ -20,10 +20,12 @@ package org.apache.iceberg.aws.s3;
 
 import java.io.Serializable;
 import java.net.URI;
+import java.time.Duration;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
+import javax.xml.stream.XMLStreamException;
 import org.apache.iceberg.EnvironmentContext;
 import org.apache.iceberg.aws.AwsClientProperties;
 import org.apache.iceberg.aws.glue.GlueCatalog;
@@ -38,6 +40,14 @@ import org.apache.iceberg.util.SerializableMap;
 import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
 import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
 import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
+import software.amazon.awssdk.core.exception.SdkServiceException;
+import software.amazon.awssdk.core.retry.RetryMode;
+import software.amazon.awssdk.core.retry.RetryPolicy;
+import software.amazon.awssdk.core.retry.backoff.EqualJitterBackoffStrategy;
+import software.amazon.awssdk.core.retry.conditions.OrRetryCondition;
+import software.amazon.awssdk.core.retry.conditions.RetryCondition;
+import software.amazon.awssdk.core.retry.conditions.RetryOnExceptionsCondition;
+import software.amazon.awssdk.core.retry.conditions.TokenBucketRetryCondition;
 import software.amazon.awssdk.services.s3.S3ClientBuilder;
 import software.amazon.awssdk.services.s3.S3Configuration;
 import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
@@ -393,6 +403,21 @@ public class S3FileIOProperties implements Serializable {
    */
   private static final String S3_FILE_IO_USER_AGENT = "s3fileio/" + 
EnvironmentContext.get();
 
+  /** Number of times to retry S3 operations. */
+  public static final String S3_RETRY_NUM_RETRIES = "s3.retry.num-retries";
+
+  public static final int S3_RETRY_NUM_RETRIES_DEFAULT = 5;
+
+  /** Minimum wait time to retry a S3 operation */
+  public static final String S3_RETRY_MIN_WAIT_MS = "s3.retry.min-wait-ms";
+
+  public static final long S3_RETRY_MIN_WAIT_MS_DEFAULT = 2_000; // 2 seconds
+
+  /** Maximum wait time to retry a S3 read operation */
+  public static final String S3_RETRY_MAX_WAIT_MS = "s3.retry.max-wait-ms";
+
+  public static final long S3_RETRY_MAX_WAIT_MS_DEFAULT = 20_000; // 20 seconds
+
   private String sseType;
   private String sseKey;
   private String sseMd5;
@@ -423,6 +448,9 @@ public class S3FileIOProperties implements Serializable {
   private final String endpoint;
   private final boolean isRemoteSigningEnabled;
   private String writeStorageClass;
+  private int s3RetryNumRetries;
+  private long s3RetryMinWaitMs;
+  private long s3RetryMaxWaitMs;
   private final Map<String, String> allProperties;
 
   public S3FileIOProperties() {
@@ -455,6 +483,9 @@ public class S3FileIOProperties implements Serializable {
     this.isRemoteSigningEnabled = REMOTE_SIGNING_ENABLED_DEFAULT;
     this.isS3AccessGrantsEnabled = S3_ACCESS_GRANTS_ENABLED_DEFAULT;
     this.isS3AccessGrantsFallbackToIamEnabled = 
S3_ACCESS_GRANTS_FALLBACK_TO_IAM_ENABLED_DEFAULT;
+    this.s3RetryNumRetries = S3_RETRY_NUM_RETRIES_DEFAULT;
+    this.s3RetryMinWaitMs = S3_RETRY_MIN_WAIT_MS_DEFAULT;
+    this.s3RetryMaxWaitMs = S3_RETRY_MAX_WAIT_MS_DEFAULT;
     this.allProperties = Maps.newHashMap();
 
     ValidationException.check(
@@ -553,6 +584,12 @@ public class S3FileIOProperties implements Serializable {
             properties,
             S3_ACCESS_GRANTS_FALLBACK_TO_IAM_ENABLED,
             S3_ACCESS_GRANTS_FALLBACK_TO_IAM_ENABLED_DEFAULT);
+    this.s3RetryNumRetries =
+        PropertyUtil.propertyAsInt(properties, S3_RETRY_NUM_RETRIES, 
S3_RETRY_NUM_RETRIES_DEFAULT);
+    this.s3RetryMinWaitMs =
+        PropertyUtil.propertyAsLong(properties, S3_RETRY_MIN_WAIT_MS, 
S3_RETRY_MIN_WAIT_MS_DEFAULT);
+    this.s3RetryMaxWaitMs =
+        PropertyUtil.propertyAsLong(properties, S3_RETRY_MAX_WAIT_MS, 
S3_RETRY_MAX_WAIT_MS_DEFAULT);
 
     ValidationException.check(
         keyIdAccessKeyBothConfigured(),
@@ -753,6 +790,34 @@ public class S3FileIOProperties implements Serializable {
     this.isS3AccessGrantsFallbackToIamEnabled = 
s3AccessGrantsFallbackToIamEnabled;
   }
 
+  public int s3RetryNumRetries() {
+    return s3RetryNumRetries;
+  }
+
+  public void setS3RetryNumRetries(int s3RetryNumRetries) {
+    this.s3RetryNumRetries = s3RetryNumRetries;
+  }
+
+  public long s3RetryMinWaitMs() {
+    return s3RetryMinWaitMs;
+  }
+
+  public void setS3RetryMinWaitMs(long s3RetryMinWaitMs) {
+    this.s3RetryMinWaitMs = s3RetryMinWaitMs;
+  }
+
+  public long s3RetryMaxWaitMs() {
+    return s3RetryMaxWaitMs;
+  }
+
+  public void setS3RetryMaxWaitMs(long s3RetryMaxWaitMs) {
+    this.s3RetryMaxWaitMs = s3RetryMaxWaitMs;
+  }
+
+  public long s3RetryTotalWaitMs() {
+    return (long) s3RetryNumRetries() * s3RetryMaxWaitMs();
+  }
+
   private boolean keyIdAccessKeyBothConfigured() {
     return (accessKeyId == null) == (secretAccessKey == null);
   }
@@ -824,6 +889,65 @@ public class S3FileIOProperties implements Serializable {
     }
   }
 
+  /**
+   * Override the retry configurations for an S3 client.
+   *
+   * <p>Sample usage:
+   *
+   * <pre>
+   *     
S3Client.builder().applyMutation(s3FileIOProperties::applyRetryConfigurations)
+   * </pre>
+   */
+  public <T extends S3ClientBuilder> void applyRetryConfigurations(T builder) {
+    builder.overrideConfiguration(
+        config ->
+            config.retryPolicy(
+                // Use a retry strategy which will persistently retry 
throttled exceptions with
+                // exponential backoff, to give S3 a chance to autoscale.
+                // LEGACY mode works best here, as it will allow throttled 
exceptions to use all of
+                // the configured retry attempts.
+                RetryPolicy.builder(RetryMode.LEGACY)
+                    .numRetries(s3RetryNumRetries)
+                    .throttlingBackoffStrategy(
+                        EqualJitterBackoffStrategy.builder()
+                            .baseDelay(Duration.ofMillis(s3RetryMinWaitMs))
+                            
.maxBackoffTime(Duration.ofMillis(s3RetryMaxWaitMs))
+                            .build())
+
+                    // Workaround: add XMLStreamException as a retryable 
exception.
+                    // https://github.com/aws/aws-sdk-java-v2/issues/5442
+                    // Without this workaround, we see SDK failures if there's 
a socket exception
+                    // while parsing an error XML response.
+                    .retryCondition(
+                        OrRetryCondition.create(
+                            RetryCondition.defaultRetryCondition(),
+                            
RetryOnExceptionsCondition.create(XMLStreamException.class)))
+
+                    // Workaround: exclude all 503s from consuming retry 
tokens.
+                    // https://github.com/aws/aws-sdk-java-v2/issues/5414
+                    // Without this workaround, workloads which see 503s from 
S3 HEAD will fail
+                    // prematurely.
+                    .retryCapacityCondition(
+                        TokenBucketRetryCondition.builder()
+                            .tokenBucketSize(500) // 500 is the SDK default
+                            .exceptionCostFunction(
+                                e -> {
+                                  if (e instanceof SdkServiceException) {
+                                    SdkServiceException sdkServiceException =
+                                        (SdkServiceException) e;
+                                    if 
(sdkServiceException.isThrottlingException()
+                                        || sdkServiceException.statusCode() == 
503) {
+                                      return 0;
+                                    }
+                                  }
+
+                                  // 5 is the SDK default for non-throttling 
exceptions
+                                  return 5;
+                                })
+                            .build())
+                    .build()));
+  }
+
   /**
    * Add the S3 Access Grants Plugin for an S3 client.
    *
diff --git 
a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIOProperties.java 
b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIOProperties.java
index f445a2d224..a61b9efb9f 100644
--- a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIOProperties.java
+++ b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIOProperties.java
@@ -36,6 +36,8 @@ import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
+import software.amazon.awssdk.core.retry.RetryPolicy;
+import software.amazon.awssdk.services.s3.S3Client;
 import software.amazon.awssdk.services.s3.S3ClientBuilder;
 import software.amazon.awssdk.services.s3.S3Configuration;
 import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
@@ -491,4 +493,17 @@ public class TestS3FileIOProperties {
     Mockito.verify(mockS3ClientBuilder)
         .overrideConfiguration(Mockito.any(ClientOverrideConfiguration.class));
   }
+
+  @Test
+  public void testApplyRetryConfiguration() {
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put(S3FileIOProperties.S3_RETRY_NUM_RETRIES, "999");
+    S3FileIOProperties s3FileIOProperties = new S3FileIOProperties(properties);
+
+    S3ClientBuilder builder = S3Client.builder();
+    s3FileIOProperties.applyRetryConfigurations(builder);
+
+    RetryPolicy retryPolicy = 
builder.overrideConfiguration().retryPolicy().get();
+    assertThat(retryPolicy.numRetries()).as("retries was not 
set").isEqualTo(999);
+  }
 }
diff --git a/docs/docs/aws.md b/docs/docs/aws.md
index b9638bb50d..5a166c0c91 100644
--- a/docs/docs/aws.md
+++ b/docs/docs/aws.md
@@ -378,6 +378,20 @@ However, for the older versions up to 0.12.0, the logic is 
as follows:
 
 For more details, please refer to the [LocationProvider 
Configuration](custom-catalog.md#custom-location-provider-implementation) 
section.  
 
+### S3 Retries
+
+Workloads which encounter S3 throttling should persistently retry, with 
exponential backoff, to make progress while S3
+automatically scales. We provide the configurations below to adjust S3 retries 
for this purpose. For workloads that encounter
+throttling and fail due to retry exhaustion, we recommend retry count to set 
32 in order allow S3 to auto-scale. Note that
+workloads with exceptionally high throughput against tables that S3 has not 
yet scaled, it may be necessary to increase the retry count further.
+
+
+| Property             | Default | Description                                 
                                          |
+|----------------------|---------|---------------------------------------------------------------------------------------|
+| s3.retry.num-retries | 5       | Number of times to retry S3 operations. 
Recommended 32 for high-throughput workloads. |
+| s3.retry.min-wait-ms | 2s      | Minimum wait time to retry a S3 operation.  
                                          |
+| s3.retry.max-wait-ms | 20s     | Maximum wait time to retry a S3 read 
operation.                                       |
+
 ### S3 Strong Consistency
 
 In November 2020, S3 announced [strong 
consistency](https://aws.amazon.com/s3/consistency/) for all read operations, 
and Iceberg is updated to fully leverage this feature.

Reply via email to