huaxingao commented on code in PR #14773:
URL: https://github.com/apache/iceberg/pull/14773#discussion_r2677315825
##########
core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java:
##########
@@ -124,12 +133,107 @@ public class TestRESTCatalog extends
CatalogTests<RESTCatalog> {
RESTCatalogProperties.NAMESPACE_SEPARATOR,
RESTCatalogAdapter.NAMESPACE_SEPARATOR_URLENCODED_UTF_8));
+ /**
+ * Test-only adapter that keeps request/response round-trip serialization
and header validation
+ * from the base test setup, while also allowing specific tests to inject
transient failures.
+ */
+ private static class HeaderValidatingAdapter extends RESTCatalogAdapter {
+ private final HTTPHeaders catalogHeaders;
+ private final HTTPHeaders contextHeaders;
+ private final Set<String> simulate503OnFirstSuccessKeys =
+
org.apache.iceberg.relocated.com.google.common.collect.Sets.newConcurrentHashSet();
Review Comment:
Good idea. I generalized the test-only hook to a key -> RuntimeException map
so tests can simulate different post-success transient failures while still
keeping simulate503OnFirstSuccessForKey as a convenience wrapper for the
current case.
##########
core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java:
##########
@@ -268,19 +277,29 @@ public <T extends RESTResponse> T handleRequest(
return castResponse(
responseType, CatalogHandlers.stageTableCreate(catalog,
namespace, request));
} else {
- LoadTableResponse response = CatalogHandlers.createTable(catalog,
namespace, request);
- responseHeaders.accept(
- ImmutableMap.of(HttpHeaders.ETAG,
ETagProvider.of(response.metadataLocation())));
- return castResponse(responseType, response);
+ return CatalogHandlers.withIdempotency(
+ httpRequest,
+ () -> {
+ LoadTableResponse response =
+ CatalogHandlers.createTable(catalog, namespace, request);
+ responseHeaders.accept(
+ ImmutableMap.of(
+ HttpHeaders.ETAG,
ETagProvider.of(response.metadataLocation())));
+ return castResponse(responseType, response);
+ });
}
}
case DROP_TABLE:
{
if (PropertyUtil.propertyAsBoolean(vars, "purgeRequested", false)) {
- CatalogHandlers.purgeTable(catalog, tableIdentFromPathVars(vars));
+ CatalogHandlers.withIdempotency(
Review Comment:
Good point. The idempotency behavior is driven by the HTTP request headers,
so I’m keeping the wrapping at the routing/adapter layer rather than pushing
HTTPRequest through every CatalogHandlers.* method.
That said, I agree it doesn’t need to be exposed publicly — I made
withIdempotency(...) package-private and it’s only used internally by the REST
adapter/routes.
##########
core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java:
##########
@@ -3313,6 +3393,155 @@ public void
testClientDoesNotSendIdempotencyWhenServerNotAdvertising() {
local.dropTable(ident);
}
+ @Test
+ public void testIdempotentDuplicateCreateReturnsCached() {
+ String key = "dup-create-key";
+ Namespace ns = Namespace.of("ns_dup");
+ Pair<TableIdentifier, Pair<RESTClient, Map<String, String>>> env =
+ prepareIdempotentEnv(key, ns, "t_dup");
+ TableIdentifier ident = env.first();
+ Pair<RESTClient, Map<String, String>> httpAndHeaders = env.second();
+ RESTClient http = httpAndHeaders.first();
+ Map<String, String> headers = httpAndHeaders.second();
+ CreateTableRequest req = createReq(ident);
Review Comment:
I factored the repeated prepareIdempotentEnv unpacking into a small
IdempotentEnv helper
##########
core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java:
##########
@@ -108,8 +115,152 @@ public class CatalogHandlers {
InMemoryPlanningState.getInstance();
private static final ExecutorService ASYNC_PLANNING_POOL =
Executors.newSingleThreadExecutor();
+ // Advanced idempotency store with TTL and in-flight coalescing.
+ //
+ // Note: This is a simple in-memory implementation meant for tests and
lightweight usage.
+ // Production servers should provide a durable store.
+ private static final ConcurrentMap<String, IdempotencyEntry>
IDEMPOTENCY_STORE =
+ Maps.newConcurrentMap();
+ private static volatile long idempotencyLifetimeMillis =
TimeUnit.MINUTES.toMillis(30);
+
private CatalogHandlers() {}
+ @SuppressWarnings("unchecked")
+ public static <T extends RESTResponse> T withIdempotency(
+ HTTPRequest httpRequest, Supplier<T> action) {
+ return withIdempotencyInternal(httpRequest, action);
+ }
+
+ public static void withIdempotency(HTTPRequest httpRequest, Runnable action)
{
+ withIdempotencyInternal(
+ httpRequest,
+ () -> {
+ action.run();
+ return Boolean.TRUE;
+ });
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T> T withIdempotencyInternal(HTTPRequest httpRequest,
Supplier<T> action) {
+ Optional<HTTPHeaders.HTTPHeader> keyHeader =
+ httpRequest.headers().firstEntry(RESTUtil.IDEMPOTENCY_KEY_HEADER);
+ if (keyHeader.isEmpty()) {
+ return action.get();
+ }
+
+ String key = keyHeader.get().value();
+
+ // "Leader" is the request thread that wins the
IDEMPOTENCY_STORE.compute(...) and creates (or
+ // replaces) the IN_PROGRESS entry for this Idempotency-Key. Only the
leader executes the
+ // action and finalizes the entry; concurrent requests for the same key
("followers") wait on
+ // the latch and then replay the finalized result/error.
+ AtomicBoolean isLeader = new AtomicBoolean(false);
Review Comment:
Changed to `isFirst`. Thanks
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]