This is an automated email from the ASF dual-hosted git repository.

dimas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git


The following commit(s) were added to refs/heads/main by this push:
     new a4c62b33 More sensible BasePolarisCatalog retry behavior (#1046)
a4c62b33 is described below

commit a4c62b3330cd80e567a3a9bd197c5110cf123b09
Author: Andrew Guterman <[email protected]>
AuthorDate: Wed Feb 26 14:34:34 2025 -0800

    More sensible BasePolarisCatalog retry behavior (#1046)
    
    * More sensible BasePolarisCatalog retry behavior
    
    * Reuse AWS/GCP retryable property
    
    * Update default to 2
---
 .../apache/polaris/core/PolarisConfiguration.java  |  8 +++
 .../quarkus/catalog/BasePolarisCatalogTest.java    | 70 +++++++++++++++------
 service/common/build.gradle.kts                    |  5 ++
 .../service/catalog/BasePolarisCatalog.java        | 51 ++++-----------
 .../service/exception/IcebergExceptionMapper.java  | 69 ++++++++++++++++++---
 .../exception/IcebergExceptionMapperTest.java      | 19 +-----
 .../service/exception/FakeAzureHttpResponse.java   | 72 ++++++++++++++++++++++
 7 files changed, 212 insertions(+), 82 deletions(-)

diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/PolarisConfiguration.java 
b/polaris-core/src/main/java/org/apache/polaris/core/PolarisConfiguration.java
index c57d58da..ca1962e3 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/PolarisConfiguration.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/PolarisConfiguration.java
@@ -264,4 +264,12 @@ public class PolarisConfiguration<T> {
                   + STORAGE_CREDENTIAL_DURATION_SECONDS.key)
           .defaultValue(30 * 60) // 30 minutes
           .build();
+
+  public static final PolarisConfiguration<Integer> 
MAX_METADATA_REFRESH_RETRIES =
+      PolarisConfiguration.<Integer>builder()
+          .key("MAX_METADATA_REFRESH_RETRIES")
+          .description(
+              "How many times to retry refreshing metadata when the previous 
error was retryable")
+          .defaultValue(2)
+          .build();
 }
diff --git 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/BasePolarisCatalogTest.java
 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/BasePolarisCatalogTest.java
index 370ede66..7f5701e2 100644
--- 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/BasePolarisCatalogTest.java
+++ 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/BasePolarisCatalogTest.java
@@ -26,8 +26,9 @@ import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
+import com.azure.core.exception.HttpResponseException;
+import com.google.cloud.storage.StorageException;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterators;
 import io.quarkus.test.junit.QuarkusMock;
 import io.quarkus.test.junit.QuarkusTest;
 import io.quarkus.test.junit.QuarkusTestProfile;
@@ -39,12 +40,14 @@ import java.lang.reflect.Method;
 import java.time.Clock;
 import java.util.Arrays;
 import java.util.EnumMap;
-import java.util.Iterator;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.function.Function;
 import java.util.function.Supplier;
+import java.util.stream.Stream;
 import org.apache.commons.lang3.NotImplementedException;
 import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.CatalogProperties;
@@ -104,6 +107,7 @@ import 
org.apache.polaris.service.catalog.io.DefaultFileIOFactory;
 import org.apache.polaris.service.catalog.io.FileIOFactory;
 import org.apache.polaris.service.catalog.io.MeasuredFileIOFactory;
 import org.apache.polaris.service.config.RealmEntityManagerFactory;
+import org.apache.polaris.service.exception.FakeAzureHttpResponse;
 import org.apache.polaris.service.exception.IcebergExceptionMapper;
 import 
org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl;
 import org.apache.polaris.service.task.TableCleanupTaskHandler;
@@ -120,7 +124,12 @@ import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 import org.mockito.Mockito;
+import software.amazon.awssdk.core.exception.NonRetryableException;
+import software.amazon.awssdk.core.exception.RetryableException;
 import software.amazon.awssdk.services.sts.StsClient;
 import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
 import software.amazon.awssdk.services.sts.model.AssumeRoleResponse;
@@ -1517,23 +1526,46 @@ public class BasePolarisCatalogTest extends 
CatalogTests<BasePolarisCatalog> {
     }
   }
 
-  @Test
-  public void testRetriableException() {
-    Iterator<String> accessDeniedHint =
-        Iterators.cycle(IcebergExceptionMapper.getAccessDeniedHints());
-    RuntimeException s3Exception = new 
RuntimeException(accessDeniedHint.next());
-    RuntimeException azureBlobStorageException = new 
RuntimeException(accessDeniedHint.next());
-    RuntimeException gcsException = new 
RuntimeException(accessDeniedHint.next());
-    RuntimeException otherException = new RuntimeException(new 
IOException("Connection reset"));
-    
Assertions.assertThat(BasePolarisCatalog.SHOULD_RETRY_REFRESH_PREDICATE.test(s3Exception))
-        .isFalse();
-    Assertions.assertThat(
-            
BasePolarisCatalog.SHOULD_RETRY_REFRESH_PREDICATE.test(azureBlobStorageException))
-        .isFalse();
-    
Assertions.assertThat(BasePolarisCatalog.SHOULD_RETRY_REFRESH_PREDICATE.test(gcsException))
-        .isFalse();
-    
Assertions.assertThat(BasePolarisCatalog.SHOULD_RETRY_REFRESH_PREDICATE.test(otherException))
-        .isTrue();
+  @ParameterizedTest
+  @MethodSource
+  public void testRetriableException(Exception exception, boolean shouldRetry) 
{
+    
Assertions.assertThat(BasePolarisCatalog.SHOULD_RETRY_REFRESH_PREDICATE.test(exception))
+        .isEqualTo(shouldRetry);
+  }
+
+  static Stream<Arguments> testRetriableException() {
+    Set<Integer> NON_RETRYABLE_CODES = Set.of(401, 403, 404);
+    Set<Integer> RETRYABLE_CODES = Set.of(408, 504);
+
+    // Create a map of HTTP code returned from a cloud provider to whether it 
should be retried
+    Map<Integer, Boolean> cloudCodeMappings = new HashMap<>();
+    NON_RETRYABLE_CODES.forEach(code -> cloudCodeMappings.put(code, false));
+    RETRYABLE_CODES.forEach(code -> cloudCodeMappings.put(code, true));
+
+    return Stream.of(
+            Stream.of(
+                Arguments.of(new RuntimeException(new IOException("Connection 
reset")), true),
+                Arguments.of(RetryableException.builder().build(), true),
+                Arguments.of(NonRetryableException.builder().build(), false)),
+            IcebergExceptionMapper.getAccessDeniedHints().stream()
+                .map(hint -> Arguments.of(new RuntimeException(hint), false)),
+            cloudCodeMappings.entrySet().stream()
+                .flatMap(
+                    entry ->
+                        Stream.of(
+                            Arguments.of(
+                                new HttpResponseException(
+                                    "", new 
FakeAzureHttpResponse(entry.getKey()), ""),
+                                entry.getValue()),
+                            Arguments.of(
+                                new StorageException(entry.getKey(), ""), 
entry.getValue()))),
+            IcebergExceptionMapper.RETRYABLE_AZURE_HTTP_CODES.stream()
+                .map(
+                    code ->
+                        Arguments.of(
+                            new HttpResponseException("", new 
FakeAzureHttpResponse(code), ""),
+                            true)))
+        .flatMap(Function.identity());
   }
 
   @Test
diff --git a/service/common/build.gradle.kts b/service/common/build.gradle.kts
index 3d6c90f4..a98ef50a 100644
--- a/service/common/build.gradle.kts
+++ b/service/common/build.gradle.kts
@@ -121,6 +121,11 @@ dependencies {
   testFixturesImplementation("software.amazon.awssdk:sts")
   testFixturesImplementation("software.amazon.awssdk:iam-policy-builder")
   testFixturesImplementation("software.amazon.awssdk:s3")
+
+  testFixturesImplementation(platform(libs.azuresdk.bom))
+  testFixturesImplementation("com.azure:azure-core")
+  testFixturesImplementation("com.azure:azure-storage-blob")
+  testFixturesImplementation("com.azure:azure-storage-file-datalake")
 }
 
 tasks.named("javadoc") { dependsOn("jandex") }
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java
 
b/service/common/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java
index c42bb7fd..f06850d7 100644
--- 
a/service/common/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java
+++ 
b/service/common/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java
@@ -18,6 +18,8 @@
  */
 package org.apache.polaris.service.catalog;
 
+import static 
org.apache.polaris.service.exception.IcebergExceptionMapper.isStorageProviderRetryableException;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Objects;
@@ -104,13 +106,11 @@ import 
org.apache.polaris.core.storage.PolarisStorageIntegration;
 import org.apache.polaris.core.storage.StorageLocation;
 import org.apache.polaris.service.catalog.io.FileIOFactory;
 import org.apache.polaris.service.catalog.io.FileIOUtil;
-import org.apache.polaris.service.exception.IcebergExceptionMapper;
 import org.apache.polaris.service.task.TaskExecutor;
 import org.apache.polaris.service.types.NotificationRequest;
 import org.apache.polaris.service.types.NotificationType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import software.amazon.awssdk.core.exception.SdkException;
 
 /** Defines the relationship between PolarisEntities and Iceberg's business 
logic. */
 public class BasePolarisCatalog extends BaseMetastoreViewCatalog
@@ -134,8 +134,6 @@ public class BasePolarisCatalog extends 
BaseMetastoreViewCatalog
       "INITIALIZE_DEFAULT_CATALOG_FILEIO_FOR_TEST";
   static final boolean INITIALIZE_DEFAULT_CATALOG_FILEIO_FOR_TEST_DEFAULT = 
false;
 
-  private static final int MAX_RETRIES = 12;
-
   public static final Predicate<Exception> SHOULD_RETRY_REFRESH_PREDICATE =
       ex -> {
         // Default arguments from BaseMetastoreTableOperation only stop 
retries on
@@ -146,7 +144,8 @@ public class BasePolarisCatalog extends 
BaseMetastoreViewCatalog
             && !(ex instanceof AlreadyExistsException)
             && !(ex instanceof ForbiddenException)
             && !(ex instanceof UnprocessableEntityException)
-            && isStorageProviderRetryableException(ex);
+            && (isStorageProviderRetryableException(ex)
+                || 
isStorageProviderRetryableException(ExceptionUtils.getRootCause(ex)));
       };
 
   private final PolarisEntityManager entityManager;
@@ -1258,7 +1257,7 @@ public class BasePolarisCatalog extends 
BaseMetastoreViewCatalog
         refreshFromMetadataLocation(
             latestLocation,
             SHOULD_RETRY_REFRESH_PREDICATE,
-            MAX_RETRIES,
+            getMaxMetadataRefreshRetries(),
             metadataLocation -> {
               String latestLocationDir =
                   latestLocation.substring(0, latestLocation.lastIndexOf('/'));
@@ -1483,7 +1482,7 @@ public class BasePolarisCatalog extends 
BaseMetastoreViewCatalog
         refreshFromMetadataLocation(
             latestLocation,
             SHOULD_RETRY_REFRESH_PREDICATE,
-            MAX_RETRIES,
+            getMaxMetadataRefreshRetries(),
             metadataLocation -> {
               String latestLocationDir =
                   latestLocation.substring(0, latestLocation.lastIndexOf('/'));
@@ -2097,37 +2096,11 @@ public class BasePolarisCatalog extends 
BaseMetastoreViewCatalog
         .getConfiguration(callContext.getPolarisCallContext(), configKey, 
defaultValue);
   }
 
-  /**
-   * Check if the exception is retryable for the storage provider
-   *
-   * @param ex exception
-   * @return true if the exception is retryable
-   */
-  private static boolean isStorageProviderRetryableException(Exception ex) {
-    // For S3/Azure, the exception is not wrapped, while for GCP the exception 
is wrapped as a
-    // RuntimeException
-    Throwable rootCause = ExceptionUtils.getRootCause(ex);
-    if (rootCause == null) {
-      // no root cause, let it retry
-      return true;
-    }
-    // only S3 SdkException has this retryable property
-    if (rootCause instanceof SdkException && ((SdkException) 
rootCause).retryable()) {
-      return true;
-    }
-    // add more cases here if needed
-    // AccessDenied is not retryable
-    return !isAccessDenied(rootCause.getMessage());
-  }
-
-  private static boolean isAccessDenied(String errorMsg) {
-    // Corresponding error messages for storage providers Aws/Azure/Gcp
-    boolean isAccessDenied =
-        errorMsg != null && 
IcebergExceptionMapper.containsAnyAccessDeniedHint(errorMsg);
-    if (isAccessDenied) {
-      LOGGER.debug("Access Denied or Forbidden error: {}", errorMsg);
-      return true;
-    }
-    return false;
+  private int getMaxMetadataRefreshRetries() {
+    return callContext
+        .getPolarisCallContext()
+        .getConfigurationStore()
+        .getConfiguration(
+            callContext.getPolarisCallContext(), 
PolarisConfiguration.MAX_METADATA_REFRESH_RETRIES);
   }
 }
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/exception/IcebergExceptionMapper.java
 
b/service/common/src/main/java/org/apache/polaris/service/exception/IcebergExceptionMapper.java
index 248aa7f5..25e33467 100644
--- 
a/service/common/src/main/java/org/apache/polaris/service/exception/IcebergExceptionMapper.java
+++ 
b/service/common/src/main/java/org/apache/polaris/service/exception/IcebergExceptionMapper.java
@@ -20,6 +20,7 @@ package org.apache.polaris.service.exception;
 
 import com.azure.core.exception.AzureException;
 import com.azure.core.exception.HttpResponseException;
+import com.google.cloud.BaseServiceException;
 import com.google.cloud.storage.StorageException;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableSet;
@@ -58,10 +59,23 @@ import org.apache.iceberg.rest.responses.ErrorResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.event.Level;
+import software.amazon.awssdk.core.exception.SdkException;
 import software.amazon.awssdk.services.s3.model.S3Exception;
 
 @Provider
 public class IcebergExceptionMapper implements 
ExceptionMapper<RuntimeException> {
+  /** Signifies that we could not extract an HTTP code from a given cloud 
exception */
+  public static final int UNKNOWN_CLOUD_HTTP_CODE = -1;
+
+  public static final Set<Integer> RETRYABLE_AZURE_HTTP_CODES =
+      Set.of(
+          Response.Status.REQUEST_TIMEOUT.getStatusCode(),
+          Response.Status.TOO_MANY_REQUESTS.getStatusCode(),
+          Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
+          Response.Status.SERVICE_UNAVAILABLE.getStatusCode(),
+          Response.Status.GATEWAY_TIMEOUT.getStatusCode(),
+          IcebergExceptionMapper.UNKNOWN_CLOUD_HTTP_CODE);
+
   private static final Logger LOGGER = 
LoggerFactory.getLogger(IcebergExceptionMapper.class);
 
   // Case-insensitive parts of exception messages that a request to a cloud 
provider was denied due
@@ -115,6 +129,36 @@ public class IcebergExceptionMapper implements 
ExceptionMapper<RuntimeException>
     return ACCESS_DENIED_HINTS.stream().anyMatch(messageLower::contains);
   }
 
+  /**
+   * Check if the exception is retryable for the storage provider
+   *
+   * @param ex exception
+   * @return true if the exception is retryable
+   */
+  public static boolean isStorageProviderRetryableException(Throwable ex) {
+    if (ex == null) {
+      return false;
+    }
+
+    if (ex.getMessage() != null && 
containsAnyAccessDeniedHint(ex.getMessage())) {
+      return false;
+    }
+
+    return switch (ex) {
+      // GCS
+      case BaseServiceException bse -> bse.isRetryable();
+
+      // S3
+      case SdkException se -> se.retryable();
+
+      // Azure exceptions don't have a retryable property so we just check the 
HTTP code
+      case HttpResponseException hre ->
+          RETRYABLE_AZURE_HTTP_CODES.contains(
+              IcebergExceptionMapper.extractHttpCodeFromCloudException(hre));
+      default -> true;
+    };
+  }
+
   @VisibleForTesting
   public static Collection<String> getAccessDeniedHints() {
     return ImmutableSet.copyOf(ACCESS_DENIED_HINTS);
@@ -158,18 +202,29 @@ public class IcebergExceptionMapper implements 
ExceptionMapper<RuntimeException>
     };
   }
 
+  /**
+   * We typically call cloud providers over HTTP, so when there's an exception 
there's typically an
+   * associated HTTP code. This extracts the HTTP code if possible.
+   *
+   * @param rex The cloud provider exception
+   * @return UNKNOWN_CLOUD_HTTP_CODE if the exception is not a cloud exception 
that we know how to
+   *     extract the code from
+   */
+  public static int extractHttpCodeFromCloudException(RuntimeException rex) {
+    return switch (rex) {
+      case S3Exception s3e -> s3e.statusCode();
+      case HttpResponseException hre -> hre.getResponse().getStatusCode();
+      case StorageException se -> se.getCode();
+      default -> UNKNOWN_CLOUD_HTTP_CODE;
+    };
+  }
+
   static int mapCloudExceptionToResponseCode(RuntimeException rex) {
     if (doesAnyThrowableContainAccessDeniedHint(rex)) {
       return Status.FORBIDDEN.getStatusCode();
     }
 
-    int httpCode =
-        switch (rex) {
-          case S3Exception s3e -> s3e.statusCode();
-          case HttpResponseException hre -> hre.getResponse().getStatusCode();
-          case StorageException se -> se.getCode();
-          default -> -1;
-        };
+    int httpCode = extractHttpCodeFromCloudException(rex);
     Status httpStatus = Status.fromStatusCode(httpCode);
     Status.Family httpFamily = Status.Family.familyOf(httpCode);
 
diff --git 
a/service/common/src/test/java/org/apache/polaris/service/exception/IcebergExceptionMapperTest.java
 
b/service/common/src/test/java/org/apache/polaris/service/exception/IcebergExceptionMapperTest.java
index e991b79d..3642234c 100644
--- 
a/service/common/src/test/java/org/apache/polaris/service/exception/IcebergExceptionMapperTest.java
+++ 
b/service/common/src/test/java/org/apache/polaris/service/exception/IcebergExceptionMapperTest.java
@@ -19,12 +19,9 @@
 package org.apache.polaris.service.exception;
 
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 import com.azure.core.exception.AzureException;
 import com.azure.core.exception.HttpResponseException;
-import com.azure.core.http.HttpResponse;
 import com.google.cloud.storage.StorageException;
 import jakarta.ws.rs.core.Response;
 import java.util.Map;
@@ -65,7 +62,8 @@ public class IcebergExceptionMapperTest {
                 entry ->
                     Stream.of(
                         Arguments.of(
-                            new HttpResponseException("", 
mockAzureResponse(entry.getKey()), ""),
+                            new HttpResponseException(
+                                "", new FakeAzureHttpResponse(entry.getKey()), 
""),
                             entry.getValue()),
                         Arguments.of(
                             
S3Exception.builder().message("").statusCode(entry.getKey()).build(),
@@ -82,17 +80,4 @@ public class IcebergExceptionMapperTest {
       
assertThat(response.getEntity()).extracting("message").isEqualTo(ex.getMessage());
     }
   }
-
-  /**
-   * Creates a mock of the Azure-specific HttpResponse object, as it's quite 
difficult to construct
-   * a "real" one.
-   *
-   * @param statusCode
-   * @return
-   */
-  private static HttpResponse mockAzureResponse(int statusCode) {
-    HttpResponse res = mock(HttpResponse.class);
-    when(res.getStatusCode()).thenReturn(statusCode);
-    return res;
-  }
 }
diff --git 
a/service/common/src/testFixtures/java/org/apache/polaris/service/exception/FakeAzureHttpResponse.java
 
b/service/common/src/testFixtures/java/org/apache/polaris/service/exception/FakeAzureHttpResponse.java
new file mode 100644
index 00000000..da83f803
--- /dev/null
+++ 
b/service/common/src/testFixtures/java/org/apache/polaris/service/exception/FakeAzureHttpResponse.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.exception;
+
+import com.azure.core.http.HttpHeaders;
+import com.azure.core.http.HttpResponse;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+/** Fake version of an Azure HttpResponse that can be forced to return a fixed 
statusCode. */
+public class FakeAzureHttpResponse extends HttpResponse {
+  private final int mockStatusCode;
+
+  public FakeAzureHttpResponse(int mockStatusCode) {
+    super(null);
+    this.mockStatusCode = mockStatusCode;
+  }
+
+  @Override
+  public int getStatusCode() {
+    return mockStatusCode;
+  }
+
+  @Override
+  @Deprecated
+  public String getHeaderValue(String name) {
+    return "";
+  }
+
+  @Override
+  public HttpHeaders getHeaders() {
+    return null;
+  }
+
+  @Override
+  public Flux<ByteBuffer> getBody() {
+    return null;
+  }
+
+  @Override
+  public Mono<byte[]> getBodyAsByteArray() {
+    return null;
+  }
+
+  @Override
+  public Mono<String> getBodyAsString() {
+    return null;
+  }
+
+  @Override
+  public Mono<String> getBodyAsString(Charset charset) {
+    return null;
+  }
+}

Reply via email to