This is an automated email from the ASF dual-hosted git repository.
dimas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git
The following commit(s) were added to refs/heads/main by this push:
new a4c62b33 More sensible BasePolarisCatalog retry behavior (#1046)
a4c62b33 is described below
commit a4c62b3330cd80e567a3a9bd197c5110cf123b09
Author: Andrew Guterman <[email protected]>
AuthorDate: Wed Feb 26 14:34:34 2025 -0800
More sensible BasePolarisCatalog retry behavior (#1046)
* More sensible BasePolarisCatalog retry behavior
* Reuse AWS/GCP retryable property
* Update default to 2
---
.../apache/polaris/core/PolarisConfiguration.java | 8 +++
.../quarkus/catalog/BasePolarisCatalogTest.java | 70 +++++++++++++++------
service/common/build.gradle.kts | 5 ++
.../service/catalog/BasePolarisCatalog.java | 51 ++++-----------
.../service/exception/IcebergExceptionMapper.java | 69 ++++++++++++++++++---
.../exception/IcebergExceptionMapperTest.java | 19 +-----
.../service/exception/FakeAzureHttpResponse.java | 72 ++++++++++++++++++++++
7 files changed, 212 insertions(+), 82 deletions(-)
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/PolarisConfiguration.java
b/polaris-core/src/main/java/org/apache/polaris/core/PolarisConfiguration.java
index c57d58da..ca1962e3 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/PolarisConfiguration.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/PolarisConfiguration.java
@@ -264,4 +264,12 @@ public class PolarisConfiguration<T> {
+ STORAGE_CREDENTIAL_DURATION_SECONDS.key)
.defaultValue(30 * 60) // 30 minutes
.build();
+
+ public static final PolarisConfiguration<Integer>
MAX_METADATA_REFRESH_RETRIES =
+ PolarisConfiguration.<Integer>builder()
+ .key("MAX_METADATA_REFRESH_RETRIES")
+ .description(
+ "How many times to retry refreshing metadata when the previous
error was retryable")
+ .defaultValue(2)
+ .build();
}
diff --git
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/BasePolarisCatalogTest.java
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/BasePolarisCatalogTest.java
index 370ede66..7f5701e2 100644
---
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/BasePolarisCatalogTest.java
+++
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/BasePolarisCatalogTest.java
@@ -26,8 +26,9 @@ import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
+import com.azure.core.exception.HttpResponseException;
+import com.google.cloud.storage.StorageException;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterators;
import io.quarkus.test.junit.QuarkusMock;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.QuarkusTestProfile;
@@ -39,12 +40,14 @@ import java.lang.reflect.Method;
import java.time.Clock;
import java.util.Arrays;
import java.util.EnumMap;
-import java.util.Iterator;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.function.Function;
import java.util.function.Supplier;
+import java.util.stream.Stream;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CatalogProperties;
@@ -104,6 +107,7 @@ import
org.apache.polaris.service.catalog.io.DefaultFileIOFactory;
import org.apache.polaris.service.catalog.io.FileIOFactory;
import org.apache.polaris.service.catalog.io.MeasuredFileIOFactory;
import org.apache.polaris.service.config.RealmEntityManagerFactory;
+import org.apache.polaris.service.exception.FakeAzureHttpResponse;
import org.apache.polaris.service.exception.IcebergExceptionMapper;
import
org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl;
import org.apache.polaris.service.task.TableCleanupTaskHandler;
@@ -120,7 +124,12 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mockito;
+import software.amazon.awssdk.core.exception.NonRetryableException;
+import software.amazon.awssdk.core.exception.RetryableException;
import software.amazon.awssdk.services.sts.StsClient;
import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
import software.amazon.awssdk.services.sts.model.AssumeRoleResponse;
@@ -1517,23 +1526,46 @@ public class BasePolarisCatalogTest extends
CatalogTests<BasePolarisCatalog> {
}
}
- @Test
- public void testRetriableException() {
- Iterator<String> accessDeniedHint =
- Iterators.cycle(IcebergExceptionMapper.getAccessDeniedHints());
- RuntimeException s3Exception = new
RuntimeException(accessDeniedHint.next());
- RuntimeException azureBlobStorageException = new
RuntimeException(accessDeniedHint.next());
- RuntimeException gcsException = new
RuntimeException(accessDeniedHint.next());
- RuntimeException otherException = new RuntimeException(new
IOException("Connection reset"));
-
Assertions.assertThat(BasePolarisCatalog.SHOULD_RETRY_REFRESH_PREDICATE.test(s3Exception))
- .isFalse();
- Assertions.assertThat(
-
BasePolarisCatalog.SHOULD_RETRY_REFRESH_PREDICATE.test(azureBlobStorageException))
- .isFalse();
-
Assertions.assertThat(BasePolarisCatalog.SHOULD_RETRY_REFRESH_PREDICATE.test(gcsException))
- .isFalse();
-
Assertions.assertThat(BasePolarisCatalog.SHOULD_RETRY_REFRESH_PREDICATE.test(otherException))
- .isTrue();
+ @ParameterizedTest
+ @MethodSource
+ public void testRetriableException(Exception exception, boolean shouldRetry)
{
+
Assertions.assertThat(BasePolarisCatalog.SHOULD_RETRY_REFRESH_PREDICATE.test(exception))
+ .isEqualTo(shouldRetry);
+ }
+
+ static Stream<Arguments> testRetriableException() {
+ Set<Integer> NON_RETRYABLE_CODES = Set.of(401, 403, 404);
+ Set<Integer> RETRYABLE_CODES = Set.of(408, 504);
+
+ // Create a map of HTTP code returned from a cloud provider to whether it
should be retried
+ Map<Integer, Boolean> cloudCodeMappings = new HashMap<>();
+ NON_RETRYABLE_CODES.forEach(code -> cloudCodeMappings.put(code, false));
+ RETRYABLE_CODES.forEach(code -> cloudCodeMappings.put(code, true));
+
+ return Stream.of(
+ Stream.of(
+ Arguments.of(new RuntimeException(new IOException("Connection
reset")), true),
+ Arguments.of(RetryableException.builder().build(), true),
+ Arguments.of(NonRetryableException.builder().build(), false)),
+ IcebergExceptionMapper.getAccessDeniedHints().stream()
+ .map(hint -> Arguments.of(new RuntimeException(hint), false)),
+ cloudCodeMappings.entrySet().stream()
+ .flatMap(
+ entry ->
+ Stream.of(
+ Arguments.of(
+ new HttpResponseException(
+ "", new
FakeAzureHttpResponse(entry.getKey()), ""),
+ entry.getValue()),
+ Arguments.of(
+ new StorageException(entry.getKey(), ""),
entry.getValue()))),
+ IcebergExceptionMapper.RETRYABLE_AZURE_HTTP_CODES.stream()
+ .map(
+ code ->
+ Arguments.of(
+ new HttpResponseException("", new
FakeAzureHttpResponse(code), ""),
+ true)))
+ .flatMap(Function.identity());
}
@Test
diff --git a/service/common/build.gradle.kts b/service/common/build.gradle.kts
index 3d6c90f4..a98ef50a 100644
--- a/service/common/build.gradle.kts
+++ b/service/common/build.gradle.kts
@@ -121,6 +121,11 @@ dependencies {
testFixturesImplementation("software.amazon.awssdk:sts")
testFixturesImplementation("software.amazon.awssdk:iam-policy-builder")
testFixturesImplementation("software.amazon.awssdk:s3")
+
+ testFixturesImplementation(platform(libs.azuresdk.bom))
+ testFixturesImplementation("com.azure:azure-core")
+ testFixturesImplementation("com.azure:azure-storage-blob")
+ testFixturesImplementation("com.azure:azure-storage-file-datalake")
}
tasks.named("javadoc") { dependsOn("jandex") }
diff --git
a/service/common/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java
b/service/common/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java
index c42bb7fd..f06850d7 100644
---
a/service/common/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java
+++
b/service/common/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java
@@ -18,6 +18,8 @@
*/
package org.apache.polaris.service.catalog;
+import static
org.apache.polaris.service.exception.IcebergExceptionMapper.isStorageProviderRetryableException;
+
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Objects;
@@ -104,13 +106,11 @@ import
org.apache.polaris.core.storage.PolarisStorageIntegration;
import org.apache.polaris.core.storage.StorageLocation;
import org.apache.polaris.service.catalog.io.FileIOFactory;
import org.apache.polaris.service.catalog.io.FileIOUtil;
-import org.apache.polaris.service.exception.IcebergExceptionMapper;
import org.apache.polaris.service.task.TaskExecutor;
import org.apache.polaris.service.types.NotificationRequest;
import org.apache.polaris.service.types.NotificationType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import software.amazon.awssdk.core.exception.SdkException;
/** Defines the relationship between PolarisEntities and Iceberg's business
logic. */
public class BasePolarisCatalog extends BaseMetastoreViewCatalog
@@ -134,8 +134,6 @@ public class BasePolarisCatalog extends
BaseMetastoreViewCatalog
"INITIALIZE_DEFAULT_CATALOG_FILEIO_FOR_TEST";
static final boolean INITIALIZE_DEFAULT_CATALOG_FILEIO_FOR_TEST_DEFAULT =
false;
- private static final int MAX_RETRIES = 12;
-
public static final Predicate<Exception> SHOULD_RETRY_REFRESH_PREDICATE =
ex -> {
// Default arguments from BaseMetastoreTableOperation only stop
retries on
@@ -146,7 +144,8 @@ public class BasePolarisCatalog extends
BaseMetastoreViewCatalog
&& !(ex instanceof AlreadyExistsException)
&& !(ex instanceof ForbiddenException)
&& !(ex instanceof UnprocessableEntityException)
- && isStorageProviderRetryableException(ex);
+ && (isStorageProviderRetryableException(ex)
+ ||
isStorageProviderRetryableException(ExceptionUtils.getRootCause(ex)));
};
private final PolarisEntityManager entityManager;
@@ -1258,7 +1257,7 @@ public class BasePolarisCatalog extends
BaseMetastoreViewCatalog
refreshFromMetadataLocation(
latestLocation,
SHOULD_RETRY_REFRESH_PREDICATE,
- MAX_RETRIES,
+ getMaxMetadataRefreshRetries(),
metadataLocation -> {
String latestLocationDir =
latestLocation.substring(0, latestLocation.lastIndexOf('/'));
@@ -1483,7 +1482,7 @@ public class BasePolarisCatalog extends
BaseMetastoreViewCatalog
refreshFromMetadataLocation(
latestLocation,
SHOULD_RETRY_REFRESH_PREDICATE,
- MAX_RETRIES,
+ getMaxMetadataRefreshRetries(),
metadataLocation -> {
String latestLocationDir =
latestLocation.substring(0, latestLocation.lastIndexOf('/'));
@@ -2097,37 +2096,11 @@ public class BasePolarisCatalog extends
BaseMetastoreViewCatalog
.getConfiguration(callContext.getPolarisCallContext(), configKey,
defaultValue);
}
- /**
- * Check if the exception is retryable for the storage provider
- *
- * @param ex exception
- * @return true if the exception is retryable
- */
- private static boolean isStorageProviderRetryableException(Exception ex) {
- // For S3/Azure, the exception is not wrapped, while for GCP the exception
is wrapped as a
- // RuntimeException
- Throwable rootCause = ExceptionUtils.getRootCause(ex);
- if (rootCause == null) {
- // no root cause, let it retry
- return true;
- }
- // only S3 SdkException has this retryable property
- if (rootCause instanceof SdkException && ((SdkException)
rootCause).retryable()) {
- return true;
- }
- // add more cases here if needed
- // AccessDenied is not retryable
- return !isAccessDenied(rootCause.getMessage());
- }
-
- private static boolean isAccessDenied(String errorMsg) {
- // Corresponding error messages for storage providers Aws/Azure/Gcp
- boolean isAccessDenied =
- errorMsg != null &&
IcebergExceptionMapper.containsAnyAccessDeniedHint(errorMsg);
- if (isAccessDenied) {
- LOGGER.debug("Access Denied or Forbidden error: {}", errorMsg);
- return true;
- }
- return false;
+ private int getMaxMetadataRefreshRetries() {
+ return callContext
+ .getPolarisCallContext()
+ .getConfigurationStore()
+ .getConfiguration(
+ callContext.getPolarisCallContext(),
PolarisConfiguration.MAX_METADATA_REFRESH_RETRIES);
}
}
diff --git
a/service/common/src/main/java/org/apache/polaris/service/exception/IcebergExceptionMapper.java
b/service/common/src/main/java/org/apache/polaris/service/exception/IcebergExceptionMapper.java
index 248aa7f5..25e33467 100644
---
a/service/common/src/main/java/org/apache/polaris/service/exception/IcebergExceptionMapper.java
+++
b/service/common/src/main/java/org/apache/polaris/service/exception/IcebergExceptionMapper.java
@@ -20,6 +20,7 @@ package org.apache.polaris.service.exception;
import com.azure.core.exception.AzureException;
import com.azure.core.exception.HttpResponseException;
+import com.google.cloud.BaseServiceException;
import com.google.cloud.storage.StorageException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
@@ -58,10 +59,23 @@ import org.apache.iceberg.rest.responses.ErrorResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
+import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.s3.model.S3Exception;
@Provider
public class IcebergExceptionMapper implements
ExceptionMapper<RuntimeException> {
+ /** Signifies that we could not extract an HTTP code from a given cloud
exception */
+ public static final int UNKNOWN_CLOUD_HTTP_CODE = -1;
+
+ public static final Set<Integer> RETRYABLE_AZURE_HTTP_CODES =
+ Set.of(
+ Response.Status.REQUEST_TIMEOUT.getStatusCode(),
+ Response.Status.TOO_MANY_REQUESTS.getStatusCode(),
+ Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
+ Response.Status.SERVICE_UNAVAILABLE.getStatusCode(),
+ Response.Status.GATEWAY_TIMEOUT.getStatusCode(),
+ IcebergExceptionMapper.UNKNOWN_CLOUD_HTTP_CODE);
+
private static final Logger LOGGER =
LoggerFactory.getLogger(IcebergExceptionMapper.class);
// Case-insensitive parts of exception messages that a request to a cloud
provider was denied due
@@ -115,6 +129,36 @@ public class IcebergExceptionMapper implements
ExceptionMapper<RuntimeException>
return ACCESS_DENIED_HINTS.stream().anyMatch(messageLower::contains);
}
+ /**
+ * Check if the exception is retryable for the storage provider
+ *
+ * @param ex exception
+ * @return true if the exception is retryable
+ */
+ public static boolean isStorageProviderRetryableException(Throwable ex) {
+ if (ex == null) {
+ return false;
+ }
+
+ if (ex.getMessage() != null &&
containsAnyAccessDeniedHint(ex.getMessage())) {
+ return false;
+ }
+
+ return switch (ex) {
+ // GCS
+ case BaseServiceException bse -> bse.isRetryable();
+
+ // S3
+ case SdkException se -> se.retryable();
+
+ // Azure exceptions don't have a retryable property so we just check the
HTTP code
+ case HttpResponseException hre ->
+ RETRYABLE_AZURE_HTTP_CODES.contains(
+ IcebergExceptionMapper.extractHttpCodeFromCloudException(hre));
+ default -> true;
+ };
+ }
+
@VisibleForTesting
public static Collection<String> getAccessDeniedHints() {
return ImmutableSet.copyOf(ACCESS_DENIED_HINTS);
@@ -158,18 +202,29 @@ public class IcebergExceptionMapper implements
ExceptionMapper<RuntimeException>
};
}
+ /**
+ * We typically call cloud providers over HTTP, so when there's an exception
there's typically an
+ * associated HTTP code. This extracts the HTTP code if possible.
+ *
+ * @param rex The cloud provider exception
+ * @return UNKNOWN_CLOUD_HTTP_CODE if the exception is not a cloud exception
that we know how to
+ * extract the code from
+ */
+ public static int extractHttpCodeFromCloudException(RuntimeException rex) {
+ return switch (rex) {
+ case S3Exception s3e -> s3e.statusCode();
+ case HttpResponseException hre -> hre.getResponse().getStatusCode();
+ case StorageException se -> se.getCode();
+ default -> UNKNOWN_CLOUD_HTTP_CODE;
+ };
+ }
+
static int mapCloudExceptionToResponseCode(RuntimeException rex) {
if (doesAnyThrowableContainAccessDeniedHint(rex)) {
return Status.FORBIDDEN.getStatusCode();
}
- int httpCode =
- switch (rex) {
- case S3Exception s3e -> s3e.statusCode();
- case HttpResponseException hre -> hre.getResponse().getStatusCode();
- case StorageException se -> se.getCode();
- default -> -1;
- };
+ int httpCode = extractHttpCodeFromCloudException(rex);
Status httpStatus = Status.fromStatusCode(httpCode);
Status.Family httpFamily = Status.Family.familyOf(httpCode);
diff --git
a/service/common/src/test/java/org/apache/polaris/service/exception/IcebergExceptionMapperTest.java
b/service/common/src/test/java/org/apache/polaris/service/exception/IcebergExceptionMapperTest.java
index e991b79d..3642234c 100644
---
a/service/common/src/test/java/org/apache/polaris/service/exception/IcebergExceptionMapperTest.java
+++
b/service/common/src/test/java/org/apache/polaris/service/exception/IcebergExceptionMapperTest.java
@@ -19,12 +19,9 @@
package org.apache.polaris.service.exception;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
import com.azure.core.exception.AzureException;
import com.azure.core.exception.HttpResponseException;
-import com.azure.core.http.HttpResponse;
import com.google.cloud.storage.StorageException;
import jakarta.ws.rs.core.Response;
import java.util.Map;
@@ -65,7 +62,8 @@ public class IcebergExceptionMapperTest {
entry ->
Stream.of(
Arguments.of(
- new HttpResponseException("",
mockAzureResponse(entry.getKey()), ""),
+ new HttpResponseException(
+ "", new FakeAzureHttpResponse(entry.getKey()),
""),
entry.getValue()),
Arguments.of(
S3Exception.builder().message("").statusCode(entry.getKey()).build(),
@@ -82,17 +80,4 @@ public class IcebergExceptionMapperTest {
assertThat(response.getEntity()).extracting("message").isEqualTo(ex.getMessage());
}
}
-
- /**
- * Creates a mock of the Azure-specific HttpResponse object, as it's quite
difficult to construct
- * a "real" one.
- *
- * @param statusCode
- * @return
- */
- private static HttpResponse mockAzureResponse(int statusCode) {
- HttpResponse res = mock(HttpResponse.class);
- when(res.getStatusCode()).thenReturn(statusCode);
- return res;
- }
}
diff --git
a/service/common/src/testFixtures/java/org/apache/polaris/service/exception/FakeAzureHttpResponse.java
b/service/common/src/testFixtures/java/org/apache/polaris/service/exception/FakeAzureHttpResponse.java
new file mode 100644
index 00000000..da83f803
--- /dev/null
+++
b/service/common/src/testFixtures/java/org/apache/polaris/service/exception/FakeAzureHttpResponse.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.exception;
+
+import com.azure.core.http.HttpHeaders;
+import com.azure.core.http.HttpResponse;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+/** Fake version of an Azure HttpResponse that can be forced to return a fixed
statusCode. */
+public class FakeAzureHttpResponse extends HttpResponse {
+ private final int mockStatusCode;
+
+ public FakeAzureHttpResponse(int mockStatusCode) {
+ super(null);
+ this.mockStatusCode = mockStatusCode;
+ }
+
+ @Override
+ public int getStatusCode() {
+ return mockStatusCode;
+ }
+
+ @Override
+ @Deprecated
+ public String getHeaderValue(String name) {
+ return "";
+ }
+
+ @Override
+ public HttpHeaders getHeaders() {
+ return null;
+ }
+
+ @Override
+ public Flux<ByteBuffer> getBody() {
+ return null;
+ }
+
+ @Override
+ public Mono<byte[]> getBodyAsByteArray() {
+ return null;
+ }
+
+ @Override
+ public Mono<String> getBodyAsString() {
+ return null;
+ }
+
+ @Override
+ public Mono<String> getBodyAsString(Charset charset) {
+ return null;
+ }
+}