This is an automated email from the ASF dual-hosted git repository.
huaxingao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 4f5768738e Add idempotency adapter and E2E coverage (#14773)
4f5768738e is described below
commit 4f5768738e79487058fd65fba3faa19612a53391
Author: Huaxin Gao <[email protected]>
AuthorDate: Fri Jan 16 09:52:55 2026 -0800
Add idempotency adapter and E2E coverage (#14773)
* rebase
* rebase to upstream
* address comments
* address comments
* add back idempotency tests
* address comments
---
.../org/apache/iceberg/rest/CatalogHandlers.java | 151 +++++++++
.../apache/iceberg/rest/RESTCatalogAdapter.java | 126 ++++---
.../org/apache/iceberg/rest/TestRESTCatalog.java | 364 +++++++++++++++++++--
3 files changed, 573 insertions(+), 68 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
index 229497576a..18de8493f4 100644
--- a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
+++ b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
@@ -24,6 +24,8 @@ import static
org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT;
import static
org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT;
import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.Collections;
@@ -33,10 +35,14 @@ import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
+import java.util.function.Supplier;
import java.util.function.ToIntFunction;
import java.util.stream.Collectors;
import org.apache.iceberg.BaseMetadataTable;
@@ -66,6 +72,7 @@ import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.exceptions.NoSuchViewException;
import org.apache.iceberg.io.CloseableIterable;
+import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -108,8 +115,152 @@ public class CatalogHandlers {
InMemoryPlanningState.getInstance();
private static final ExecutorService ASYNC_PLANNING_POOL =
Executors.newSingleThreadExecutor();
+ // Advanced idempotency store with TTL and in-flight coalescing.
+ //
+ // Note: This is a simple in-memory implementation meant for tests and
lightweight usage.
+ // Production servers should provide a durable store.
+ private static final ConcurrentMap<String, IdempotencyEntry>
IDEMPOTENCY_STORE =
+ Maps.newConcurrentMap();
+ private static volatile long idempotencyLifetimeMillis =
TimeUnit.MINUTES.toMillis(30);
+
private CatalogHandlers() {}
+ @SuppressWarnings("unchecked")
+ static <T extends RESTResponse> T withIdempotency(HTTPRequest httpRequest,
Supplier<T> action) {
+ return withIdempotencyInternal(httpRequest, action);
+ }
+
+ static void withIdempotency(HTTPRequest httpRequest, Runnable action) {
+ withIdempotencyInternal(
+ httpRequest,
+ () -> {
+ action.run();
+ return Boolean.TRUE;
+ });
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T> T withIdempotencyInternal(HTTPRequest httpRequest,
Supplier<T> action) {
+ Optional<HTTPHeaders.HTTPHeader> keyHeader =
+ httpRequest.headers().firstEntry(RESTUtil.IDEMPOTENCY_KEY_HEADER);
+ if (keyHeader.isEmpty()) {
+ return action.get();
+ }
+
+ String key = keyHeader.get().value();
+
+ // The "first" request for this Idempotency-Key is the one that wins
+ // IDEMPOTENCY_STORE.compute(...)
+ // and creates (or replaces) the IN_PROGRESS entry. Only that request
executes the action and
+ // finalizes the entry; concurrent requests for the same key wait on the
latch and then replay
+ // the finalized result/error.
+ AtomicBoolean isFirst = new AtomicBoolean(false);
+ IdempotencyEntry entry =
+ IDEMPOTENCY_STORE.compute(
+ key,
+ (k, current) -> {
+ if (current == null || current.isExpired()) {
+ isFirst.set(true);
+ return IdempotencyEntry.inProgress();
+ }
+ return current;
+ });
+
+ // Fast-path: already finalized (another request completed earlier)
+ if (entry.status == IdempotencyEntry.Status.FINALIZED) {
+ if (entry.error != null) {
+ throw entry.error;
+ }
+ return (T) entry.responseBody;
+ }
+
+ if (!isFirst.get()) {
+ // In-flight coalescing: wait for the first request to finalize
+ entry.awaitFinalization();
+ if (entry.error != null) {
+ throw entry.error;
+ }
+ return (T) entry.responseBody;
+ }
+
+ // First request: execute the action and finalize the entry
+ try {
+ T res = action.get();
+ entry.finalizeSuccess(res);
+ return res;
+ } catch (RuntimeException e) {
+ entry.finalizeError(e);
+ throw e;
+ }
+ }
+
+ @VisibleForTesting
+ static void setIdempotencyLifetimeFromIso(String isoDuration) {
+ if (isoDuration == null) {
+ return;
+ }
+ try {
+ idempotencyLifetimeMillis = Duration.parse(isoDuration).toMillis();
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Invalid idempotency lifetime: " +
isoDuration, e);
+ }
+ }
+
+ private static final class IdempotencyEntry {
+ enum Status {
+ IN_PROGRESS,
+ FINALIZED
+ }
+
+ private final CountDownLatch latch;
+ private final long firstSeenMillis;
+ private volatile Status status;
+ private volatile Object responseBody;
+ private volatile RuntimeException error;
+
+ private IdempotencyEntry(Status status) {
+ this.status = status;
+ this.latch = new CountDownLatch(1);
+ this.firstSeenMillis = System.currentTimeMillis();
+ }
+
+ static IdempotencyEntry inProgress() {
+ return new IdempotencyEntry(Status.IN_PROGRESS);
+ }
+
+ void finalizeSuccess(Object body) {
+ this.responseBody = body;
+ this.status = Status.FINALIZED;
+ this.latch.countDown();
+ }
+
+ void finalizeError(RuntimeException cause) {
+ this.error = cause;
+ this.status = Status.FINALIZED;
+ this.latch.countDown();
+ }
+
+ void awaitFinalization() {
+ try {
+ this.latch.await();
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(
+ "Interrupted while waiting for idempotent request to complete",
ie);
+ }
+ }
+
+ boolean isExpired() {
+ if (this.status != Status.FINALIZED) {
+ return false;
+ }
+
+ Instant expiry =
+
Instant.ofEpochMilli(this.firstSeenMillis).plusMillis(idempotencyLifetimeMillis);
+ return Instant.now().isAfter(expiry);
+ }
+ }
+
/**
* Exception used to avoid retrying commits when assertions fail.
*
diff --git a/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java
b/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java
index e62937b6df..0600ef5515 100644
--- a/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java
+++ b/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java
@@ -207,8 +207,11 @@ public class RESTCatalogAdapter extends BaseHTTPClient {
case CREATE_NAMESPACE:
if (asNamespaceCatalog != null) {
CreateNamespaceRequest request =
castRequest(CreateNamespaceRequest.class, body);
- return castResponse(
- responseType,
CatalogHandlers.createNamespace(asNamespaceCatalog, request));
+ return CatalogHandlers.withIdempotency(
+ httpRequest,
+ () ->
+ castResponse(
+ responseType,
CatalogHandlers.createNamespace(asNamespaceCatalog, request)));
}
break;
@@ -229,7 +232,9 @@ public class RESTCatalogAdapter extends BaseHTTPClient {
case DROP_NAMESPACE:
if (asNamespaceCatalog != null) {
- CatalogHandlers.dropNamespace(asNamespaceCatalog,
namespaceFromPathVars(vars));
+ CatalogHandlers.withIdempotency(
+ httpRequest,
+ () -> CatalogHandlers.dropNamespace(asNamespaceCatalog,
namespaceFromPathVars(vars)));
return null;
}
break;
@@ -239,9 +244,13 @@ public class RESTCatalogAdapter extends BaseHTTPClient {
Namespace namespace = namespaceFromPathVars(vars);
UpdateNamespacePropertiesRequest request =
castRequest(UpdateNamespacePropertiesRequest.class, body);
- return castResponse(
- responseType,
- CatalogHandlers.updateNamespaceProperties(asNamespaceCatalog,
namespace, request));
+ return CatalogHandlers.withIdempotency(
+ httpRequest,
+ () ->
+ castResponse(
+ responseType,
+ CatalogHandlers.updateNamespaceProperties(
+ asNamespaceCatalog, namespace, request)));
}
break;
@@ -268,19 +277,29 @@ public class RESTCatalogAdapter extends BaseHTTPClient {
return castResponse(
responseType, CatalogHandlers.stageTableCreate(catalog,
namespace, request));
} else {
- LoadTableResponse response = CatalogHandlers.createTable(catalog,
namespace, request);
- responseHeaders.accept(
- ImmutableMap.of(HttpHeaders.ETAG,
ETagProvider.of(response.metadataLocation())));
- return castResponse(responseType, response);
+ return CatalogHandlers.withIdempotency(
+ httpRequest,
+ () -> {
+ LoadTableResponse response =
+ CatalogHandlers.createTable(catalog, namespace, request);
+ responseHeaders.accept(
+ ImmutableMap.of(
+ HttpHeaders.ETAG,
ETagProvider.of(response.metadataLocation())));
+ return castResponse(responseType, response);
+ });
}
}
case DROP_TABLE:
{
if (PropertyUtil.propertyAsBoolean(vars, "purgeRequested", false)) {
- CatalogHandlers.purgeTable(catalog, tableIdentFromPathVars(vars));
+ CatalogHandlers.withIdempotency(
+ httpRequest,
+ () -> CatalogHandlers.purgeTable(catalog,
tableIdentFromPathVars(vars)));
} else {
- CatalogHandlers.dropTable(catalog, tableIdentFromPathVars(vars));
+ CatalogHandlers.withIdempotency(
+ httpRequest,
+ () -> CatalogHandlers.dropTable(catalog,
tableIdentFromPathVars(vars)));
}
return null;
}
@@ -352,36 +371,47 @@ public class RESTCatalogAdapter extends BaseHTTPClient {
case REGISTER_TABLE:
{
- LoadTableResponse response =
- CatalogHandlers.registerTable(
- catalog,
- namespaceFromPathVars(vars),
- castRequest(RegisterTableRequest.class, body));
-
- responseHeaders.accept(
- ImmutableMap.of(HttpHeaders.ETAG,
ETagProvider.of(response.metadataLocation())));
-
- return castResponse(responseType, response);
+ return CatalogHandlers.withIdempotency(
+ httpRequest,
+ () -> {
+ LoadTableResponse response =
+ CatalogHandlers.registerTable(
+ catalog,
+ namespaceFromPathVars(vars),
+ castRequest(RegisterTableRequest.class, body));
+
+ responseHeaders.accept(
+ ImmutableMap.of(
+ HttpHeaders.ETAG,
ETagProvider.of(response.metadataLocation())));
+
+ return castResponse(responseType, response);
+ });
}
case UPDATE_TABLE:
{
- LoadTableResponse response =
- CatalogHandlers.updateTable(
- catalog,
- tableIdentFromPathVars(vars),
- castRequest(UpdateTableRequest.class, body));
-
- responseHeaders.accept(
- ImmutableMap.of(HttpHeaders.ETAG,
ETagProvider.of(response.metadataLocation())));
-
- return castResponse(responseType, response);
+ return CatalogHandlers.withIdempotency(
+ httpRequest,
+ () -> {
+ LoadTableResponse response =
+ CatalogHandlers.updateTable(
+ catalog,
+ tableIdentFromPathVars(vars),
+ castRequest(UpdateTableRequest.class, body));
+
+ responseHeaders.accept(
+ ImmutableMap.of(
+ HttpHeaders.ETAG,
ETagProvider.of(response.metadataLocation())));
+
+ return castResponse(responseType, response);
+ });
}
case RENAME_TABLE:
{
RenameTableRequest request = castRequest(RenameTableRequest.class,
body);
- CatalogHandlers.renameTable(catalog, request);
+ CatalogHandlers.withIdempotency(
+ httpRequest, () -> CatalogHandlers.renameTable(catalog,
request));
return null;
}
@@ -395,7 +425,7 @@ public class RESTCatalogAdapter extends BaseHTTPClient {
case COMMIT_TRANSACTION:
{
CommitTransactionRequest request =
castRequest(CommitTransactionRequest.class, body);
- commitTransaction(catalog, request);
+ CatalogHandlers.withIdempotency(httpRequest, () ->
commitTransaction(catalog, request));
return null;
}
@@ -422,8 +452,12 @@ public class RESTCatalogAdapter extends BaseHTTPClient {
if (null != asViewCatalog) {
Namespace namespace = namespaceFromPathVars(vars);
CreateViewRequest request = castRequest(CreateViewRequest.class,
body);
- return castResponse(
- responseType, CatalogHandlers.createView(asViewCatalog,
namespace, request));
+ return CatalogHandlers.withIdempotency(
+ httpRequest,
+ () ->
+ castResponse(
+ responseType,
+ CatalogHandlers.createView(asViewCatalog, namespace,
request)));
}
break;
}
@@ -451,8 +485,11 @@ public class RESTCatalogAdapter extends BaseHTTPClient {
if (null != asViewCatalog) {
TableIdentifier ident = viewIdentFromPathVars(vars);
UpdateTableRequest request = castRequest(UpdateTableRequest.class,
body);
- return castResponse(
- responseType, CatalogHandlers.updateView(asViewCatalog, ident,
request));
+ return CatalogHandlers.withIdempotency(
+ httpRequest,
+ () ->
+ castResponse(
+ responseType,
CatalogHandlers.updateView(asViewCatalog, ident, request)));
}
break;
}
@@ -461,7 +498,8 @@ public class RESTCatalogAdapter extends BaseHTTPClient {
{
if (null != asViewCatalog) {
RenameTableRequest request = castRequest(RenameTableRequest.class,
body);
- CatalogHandlers.renameView(asViewCatalog, request);
+ CatalogHandlers.withIdempotency(
+ httpRequest, () -> CatalogHandlers.renameView(asViewCatalog,
request));
return null;
}
break;
@@ -470,7 +508,9 @@ public class RESTCatalogAdapter extends BaseHTTPClient {
case DROP_VIEW:
{
if (null != asViewCatalog) {
- CatalogHandlers.dropView(asViewCatalog,
viewIdentFromPathVars(vars));
+ CatalogHandlers.withIdempotency(
+ httpRequest,
+ () -> CatalogHandlers.dropView(asViewCatalog,
viewIdentFromPathVars(vars)));
return null;
}
break;
@@ -568,8 +608,10 @@ public class RESTCatalogAdapter extends BaseHTTPClient {
vars.putAll(request.queryParameters());
vars.putAll(routeAndVars.second());
- return handleRequest(
- routeAndVars.first(), vars.build(), request, responseType,
responseHeaders);
+ T resp =
+ handleRequest(
+ routeAndVars.first(), vars.build(), request, responseType,
responseHeaders);
+ return resp;
} catch (RuntimeException e) {
configureResponseFromException(e, errorBuilder);
}
diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
index d202680e56..40dc050311 100644
--- a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
+++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.rest;
import static org.apache.iceberg.types.Types.NestedField.required;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatCode;
import static org.assertj.core.api.InstanceOfAssertFactories.map;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyMap;
@@ -69,28 +70,36 @@ import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.UpdatePartitionSpec;
import org.apache.iceberg.UpdateSchema;
+import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.CatalogTests;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SessionCatalog;
import org.apache.iceberg.catalog.TableCommit;
import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.NotAuthorizedException;
import org.apache.iceberg.exceptions.NotFoundException;
+import org.apache.iceberg.exceptions.RESTException;
import org.apache.iceberg.exceptions.ServiceFailureException;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.inmemory.InMemoryCatalog;
import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.rest.HTTPRequest.HTTPMethod;
import org.apache.iceberg.rest.RESTCatalogProperties.SnapshotMode;
+import org.apache.iceberg.rest.auth.AuthManager;
+import org.apache.iceberg.rest.auth.AuthManagers;
+import org.apache.iceberg.rest.auth.AuthSession;
import org.apache.iceberg.rest.auth.AuthSessionUtil;
import org.apache.iceberg.rest.auth.OAuth2Properties;
import org.apache.iceberg.rest.auth.OAuth2Util;
+import org.apache.iceberg.rest.requests.CreateTableRequest;
import org.apache.iceberg.rest.requests.UpdateTableRequest;
import org.apache.iceberg.rest.responses.ConfigResponse;
import org.apache.iceberg.rest.responses.CreateNamespaceResponse;
@@ -100,6 +109,7 @@ import org.apache.iceberg.rest.responses.ListTablesResponse;
import org.apache.iceberg.rest.responses.LoadTableResponse;
import org.apache.iceberg.rest.responses.OAuthTokenResponse;
import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.Pair;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.awaitility.Awaitility;
import org.eclipse.jetty.server.Server;
@@ -124,12 +134,134 @@ public class TestRESTCatalog extends
CatalogTests<RESTCatalog> {
RESTCatalogProperties.NAMESPACE_SEPARATOR,
RESTCatalogAdapter.NAMESPACE_SEPARATOR_URLENCODED_UTF_8));
+ private static final class IdempotentEnv {
+ private final TableIdentifier ident;
+ private final RESTClient http;
+ private final Map<String, String> headers;
+
+ private IdempotentEnv(TableIdentifier ident, RESTClient http, Map<String,
String> headers) {
+ this.ident = ident;
+ this.http = http;
+ this.headers = headers;
+ }
+ }
+
+ /**
+ * Test-only adapter that keeps request/response round-trip serialization
and header validation
+ * from the base test setup, while also allowing specific tests to inject
transient failures.
+ */
+ private static class HeaderValidatingAdapter extends RESTCatalogAdapter {
+ private final HTTPHeaders catalogHeaders;
+ private final HTTPHeaders contextHeaders;
+ private final java.util.concurrent.ConcurrentMap<String, RuntimeException>
+ simulateFailureOnFirstSuccessByKey = new
java.util.concurrent.ConcurrentHashMap<>();
+
+ HeaderValidatingAdapter(
+ Catalog catalog, HTTPHeaders catalogHeaders, HTTPHeaders
contextHeaders) {
+ super(catalog);
+ this.catalogHeaders = catalogHeaders;
+ this.contextHeaders = contextHeaders;
+ }
+
+ /**
+ * Test helper to simulate a transient failure after the first successful
mutation for a key.
+ *
+ * <p>Useful to validate that idempotency correctly replays a finalized
result when the client
+ * retries after a post-success transient failure.
+ */
+ public void simulateFailureOnFirstSuccessForKey(String key,
RuntimeException failure) {
+ Preconditions.checkArgument(key != null, "Invalid idempotency key:
null");
+ Preconditions.checkArgument(failure != null, "Invalid failure: null");
+ simulateFailureOnFirstSuccessByKey.put(key, failure);
+ }
+
+ /** Test helper to simulate a transient 503 after the first successful
mutation for a key. */
+ public void simulate503OnFirstSuccessForKey(String key) {
+ simulateFailureOnFirstSuccessForKey(
+ key,
+ new CommitStateUnknownException(
+ new RuntimeException("simulated transient 503 after success")));
+ }
+
+ @Override
+ public <T extends RESTResponse> T execute(
+ HTTPRequest request,
+ Class<T> responseType,
+ Consumer<ErrorResponse> errorHandler,
+ Consumer<Map<String, String>> responseHeaders) {
+ if (!ResourcePaths.tokens().equals(request.path())) {
+ if (ResourcePaths.config().equals(request.path())) {
+
assertThat(request.headers().entries()).containsAll(catalogHeaders.entries());
+ } else {
+
assertThat(request.headers().entries()).containsAll(contextHeaders.entries());
+ }
+ }
+
+ Object body = roundTripSerialize(request.body(), "request");
+ HTTPRequest req =
ImmutableHTTPRequest.builder().from(request).body(body).build();
+ T response = super.execute(req, responseType, errorHandler,
responseHeaders);
+ return roundTripSerialize(response, "response");
+ }
+
+ @Override
+ protected <T extends RESTResponse> T execute(
+ HTTPRequest request,
+ Class<T> responseType,
+ Consumer<ErrorResponse> errorHandler,
+ Consumer<Map<String, String>> responseHeaders,
+ ParserContext parserContext) {
+ ErrorResponse.Builder errorBuilder = ErrorResponse.builder();
+ Pair<Route, Map<String, String>> routeAndVars =
Route.from(request.method(), request.path());
+ if (routeAndVars != null) {
+ try {
+ ImmutableMap.Builder<String, String> vars = ImmutableMap.builder();
+ vars.putAll(request.queryParameters());
+ vars.putAll(routeAndVars.second());
+
+ T resp =
+ handleRequest(
+ routeAndVars.first(), vars.build(), request, responseType,
responseHeaders);
+
+ // For tests: simulate a transient 503 after the first successful
mutation for a key.
+ Optional<HTTPHeaders.HTTPHeader> keyHeader =
+ request.headers().firstEntry(RESTUtil.IDEMPOTENCY_KEY_HEADER);
+ boolean isMutation =
+ request.method() == HTTPMethod.POST || request.method() ==
HTTPMethod.DELETE;
+ if (isMutation && keyHeader.isPresent()) {
+ String key = keyHeader.get().value();
+ RuntimeException failure =
simulateFailureOnFirstSuccessByKey.remove(key);
+ if (failure != null) {
+ throw failure;
+ }
+ }
+
+ return resp;
+ } catch (RuntimeException e) {
+ configureResponseFromException(e, errorBuilder);
+ }
+
+ } else {
+ errorBuilder
+ .responseCode(400)
+ .withType("BadRequestException")
+ .withMessage(
+ String.format("No route for request: %s %s", request.method(),
request.path()));
+ }
+
+ ErrorResponse error = errorBuilder.build();
+ errorHandler.accept(error);
+
+ // if the error handler doesn't throw an exception, throw a generic one
+ throw new RESTException("Unhandled error: %s", error);
+ }
+ }
+
@TempDir public Path temp;
private RESTCatalog restCatalog;
private InMemoryCatalog backendCatalog;
private Server httpServer;
- private RESTCatalogAdapter adapterForRESTServer;
+ private HeaderValidatingAdapter adapterForRESTServer;
@BeforeEach
public void createCatalog() throws Exception {
@@ -156,31 +288,7 @@ public class TestRESTCatalog extends
CatalogTests<RESTCatalog> {
"test-value"));
adapterForRESTServer =
- Mockito.spy(
- new RESTCatalogAdapter(backendCatalog) {
- @Override
- public <T extends RESTResponse> T execute(
- HTTPRequest request,
- Class<T> responseType,
- Consumer<ErrorResponse> errorHandler,
- Consumer<Map<String, String>> responseHeaders) {
- // this doesn't use a Mockito spy because this is used for
catalog tests, which have
- // different method calls
- if (!ResourcePaths.tokens().equals(request.path())) {
- if (ResourcePaths.config().equals(request.path())) {
-
assertThat(request.headers().entries()).containsAll(catalogHeaders.entries());
- } else {
-
assertThat(request.headers().entries()).containsAll(contextHeaders.entries());
- }
- }
-
- Object body = roundTripSerialize(request.body(), "request");
- HTTPRequest req =
ImmutableHTTPRequest.builder().from(request).body(body).build();
- T response = super.execute(req, responseType, errorHandler,
responseHeaders);
- T responseAfterSerialization = roundTripSerialize(response,
"response");
- return responseAfterSerialization;
- }
- });
+ Mockito.spy(new HeaderValidatingAdapter(backendCatalog,
catalogHeaders, contextHeaders));
ServletContextHandler servletContext =
new ServletContextHandler(ServletContextHandler.NO_SESSIONS);
@@ -3313,6 +3421,136 @@ public class TestRESTCatalog extends
CatalogTests<RESTCatalog> {
local.dropTable(ident);
}
+ @Test
+ public void testIdempotentDuplicateCreateReturnsCached() {
+ String key = "dup-create-key";
+ Namespace ns = Namespace.of("ns_dup");
+ IdempotentEnv env = idempotentEnv(key, ns, "t_dup");
+ CreateTableRequest req = createReq(env.ident);
+
+ // First create succeeds
+ LoadTableResponse first =
+ env.http.post(
+ ResourcePaths.forCatalogProperties(ImmutableMap.of()).tables(ns),
+ req,
+ LoadTableResponse.class,
+ env.headers,
+ ErrorHandlers.tableErrorHandler());
+ assertThat(first).isNotNull();
+
+ // Verify request shape (method, path, headers including Idempotency-Key)
+ verifyCreatePost(ns, env.headers);
+
+ // Duplicate with same key returns cached 200 OK
+ LoadTableResponse second =
+ env.http.post(
+ ResourcePaths.forCatalogProperties(ImmutableMap.of()).tables(ns),
+ req,
+ LoadTableResponse.class,
+ env.headers,
+ ErrorHandlers.tableErrorHandler());
+ assertThat(second).isNotNull();
+ }
+
+ @Test
+ public void testIdempotencyKeyLifetimeExpiredTreatsAsNew() {
+ // Set TTL to 0 so cached success expires immediately
+ CatalogHandlers.setIdempotencyLifetimeFromIso("PT0S");
+ try {
+ String key = "expired-create-key";
+ Namespace ns = Namespace.of("ns_exp");
+ IdempotentEnv env = idempotentEnv(key, ns, "t_exp");
+ CreateTableRequest req = createReq(env.ident);
+
+ // First create succeeds
+ LoadTableResponse created =
+ env.http.post(
+ ResourcePaths.forCatalogProperties(ImmutableMap.of()).tables(ns),
+ req,
+ LoadTableResponse.class,
+ env.headers,
+ ErrorHandlers.tableErrorHandler());
+ assertThat(created).isNotNull();
+
+ // Verify request shape (method, path, headers including Idempotency-Key)
+ verifyCreatePost(ns, env.headers);
+
+ // TTL expired -> duplicate with same key should be treated as new and
fail with AlreadyExists
+ assertThatThrownBy(
+ () ->
+ env.http.post(
+
ResourcePaths.forCatalogProperties(ImmutableMap.of()).tables(ns),
+ req,
+ LoadTableResponse.class,
+ env.headers,
+ ErrorHandlers.tableErrorHandler()))
+ .isInstanceOf(AlreadyExistsException.class)
+ .hasMessageContaining(env.ident.toString());
+ } finally {
+ // Restore default TTL for other tests
+ CatalogHandlers.setIdempotencyLifetimeFromIso("PT30M");
+ }
+ }
+
+ @Test
+ public void testIdempotentCreateReplayAfterSimulated503() {
+ // Use a fixed key and simulate 503 after first success for that key
+ String key = "idemp-create-503";
+ adapterForRESTServer.simulate503OnFirstSuccessForKey(key);
+ Namespace ns = Namespace.of("ns_idemp");
+ IdempotentEnv env = idempotentEnv(key, ns, "t_idemp");
+ CreateTableRequest req = createReq(env.ident);
+
+ // First attempt: server finalizes success but responds 503
+ assertThatThrownBy(
+ () ->
+ env.http.post(
+
ResourcePaths.forCatalogProperties(ImmutableMap.of()).tables(ns),
+ req,
+ LoadTableResponse.class,
+ env.headers,
+ ErrorHandlers.tableErrorHandler()))
+ .isInstanceOf(RuntimeException.class)
+ .hasMessageContaining("simulated transient 503");
+
+ // Verify request shape (method, path, headers including Idempotency-Key)
+ verifyCreatePost(ns, env.headers);
+
+ // Retry with same key: server should replay 200 OK
+ LoadTableResponse replay =
+ env.http.post(
+ ResourcePaths.forCatalogProperties(ImmutableMap.of()).tables(ns),
+ req,
+ LoadTableResponse.class,
+ env.headers,
+ ErrorHandlers.tableErrorHandler());
+ assertThat(replay).isNotNull();
+ }
+
+ @Test
+ public void testIdempotentDropDuplicateNoop() {
+ String key = "idemp-drop-void";
+ Namespace ns = Namespace.of("ns_void");
+ IdempotentEnv env = idempotentEnv(key, ns, "t_void");
+
+ // Create a table to drop
+ restCatalog.createTable(
+ env.ident,
+ new Schema(Types.NestedField.required(1, "id",
Types.IntegerType.get())),
+ PartitionSpec.unpartitioned());
+
+ String path =
ResourcePaths.forCatalogProperties(ImmutableMap.of()).table(env.ident);
+
+ // First drop: table exists -> drop succeeds
+ env.http.delete(path, null, env.headers,
ErrorHandlers.tableErrorHandler());
+ assertThat(restCatalog.tableExists(env.ident)).isFalse();
+
+ // Second drop with the same key: should be a no-op (no exception)
+ assertThatCode(
+ () -> env.http.delete(path, null, env.headers,
ErrorHandlers.tableErrorHandler()))
+ .doesNotThrowAnyException();
+ }
+
@Test
public void nestedNamespaceWithLegacySeparator() {
RESTCatalogAdapter adapter = Mockito.spy(new
RESTCatalogAdapter(backendCatalog));
@@ -3454,6 +3692,71 @@ public class TestRESTCatalog extends
CatalogTests<RESTCatalog> {
return local;
}
+ private Pair<RESTClient, Map<String, String>> httpAndHeaders(String
idempotencyKey) {
+ Map<String, String> headers =
+ ImmutableMap.of(
+ RESTUtil.IDEMPOTENCY_KEY_HEADER,
+ idempotencyKey,
+ "Authorization",
+ "Bearer client-credentials-token:sub=user",
+ "test-header",
+ "test-value");
+
+ Map<String, String> conf =
+ ImmutableMap.of(
+ CatalogProperties.URI,
+ httpServer.getURI().toString(),
+ HTTPClient.REST_SOCKET_TIMEOUT_MS,
+ "600000",
+ HTTPClient.REST_CONNECTION_TIMEOUT_MS,
+ "600000",
+ "header.test-header",
+ "test-value");
+ RESTClient httpBase =
+ HTTPClient.builder(conf)
+ .uri(conf.get(CatalogProperties.URI))
+ .withHeaders(RESTUtil.configHeaders(conf))
+ .build();
+ AuthManager am = AuthManagers.loadAuthManager("test", conf);
+ AuthSession httpSession = am.initSession(httpBase, conf);
+ RESTClient http = httpBase.withAuthSession(httpSession);
+ return Pair.of(http, headers);
+ }
+
+ private Pair<TableIdentifier, Pair<RESTClient, Map<String, String>>>
prepareIdempotentEnv(
+ String key, Namespace ns, String tableName) {
+ TableIdentifier ident = TableIdentifier.of(ns, tableName);
+ restCatalog.createNamespace(ns, ImmutableMap.of());
+ return Pair.of(ident, httpAndHeaders(key));
+ }
+
+ private IdempotentEnv idempotentEnv(String key, Namespace ns, String
tableName) {
+ Pair<TableIdentifier, Pair<RESTClient, Map<String, String>>> env =
+ prepareIdempotentEnv(key, ns, tableName);
+ Pair<RESTClient, Map<String, String>> httpAndHeaders = env.second();
+ return new IdempotentEnv(env.first(), httpAndHeaders.first(),
httpAndHeaders.second());
+ }
+
+ private static CreateTableRequest createReq(TableIdentifier ident) {
+ return CreateTableRequest.builder()
+ .withName(ident.name())
+ .withSchema(new Schema(Types.NestedField.required(1, "id",
Types.IntegerType.get())))
+ .withPartitionSpec(PartitionSpec.unpartitioned())
+ .build();
+ }
+
+ private void verifyCreatePost(Namespace ns, Map<String, String> headers) {
+ verify(adapterForRESTServer, atLeastOnce())
+ .execute(
+ reqMatcherContainsHeaders(
+ HTTPMethod.POST,
+
ResourcePaths.forCatalogProperties(ImmutableMap.of()).tables(ns),
+ headers),
+ eq(LoadTableResponse.class),
+ any(),
+ any());
+ }
+
@Test
@Override
public void testLoadTableWithMissingMetadataFile(@TempDir Path tempDir) {
@@ -3545,6 +3848,15 @@ public class TestRESTCatalog extends
CatalogTests<RESTCatalog> {
&& Objects.equals(req.body(), body));
}
+ static HTTPRequest reqMatcherContainsHeaders(
+ HTTPMethod method, String path, Map<String, String> headers) {
+ return argThat(
+ req ->
+ req.method() == method
+ && req.path().equals(path)
+ &&
req.headers().entries().containsAll(HTTPHeaders.of(headers).entries()));
+ }
+
private static List<HTTPRequest> allRequests(RESTCatalogAdapter adapter) {
ArgumentCaptor<HTTPRequest> captor =
ArgumentCaptor.forClass(HTTPRequest.class);
verify(adapter, atLeastOnce()).execute(captor.capture(), any(), any(),
any());