huaxingao commented on code in PR #14773:
URL: https://github.com/apache/iceberg/pull/14773#discussion_r2663685626
##########
core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java:
##########
@@ -108,8 +114,147 @@ public class CatalogHandlers {
InMemoryPlanningState.getInstance();
private static final ExecutorService ASYNC_PLANNING_POOL =
Executors.newSingleThreadExecutor();
+ // Advanced idempotency store with TTL and in-flight coalescing.
+ //
+ // Note: This is a simple in-memory implementation meant for tests and
lightweight usage.
+ // Production servers should provide a durable store.
+ private static final ConcurrentMap<String, IdempotencyEntry>
IDEMPOTENCY_STORE =
+ Maps.newConcurrentMap();
+ private static volatile long idempotencyLifetimeMillis =
TimeUnit.MINUTES.toMillis(30);
+
private CatalogHandlers() {}
+ @SuppressWarnings("unchecked")
+ public static <T extends RESTResponse> T withIdempotency(
+ HTTPRequest httpRequest, Supplier<T> action) {
+ return withIdempotencyInternal(httpRequest, action);
+ }
+
+ public static void withIdempotency(HTTPRequest httpRequest, Runnable action)
{
+ withIdempotencyInternal(
+ httpRequest,
+ () -> {
+ action.run();
+ return Boolean.TRUE;
+ });
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T> T withIdempotencyInternal(HTTPRequest httpRequest,
Supplier<T> action) {
+ Optional<HTTPHeaders.HTTPHeader> keyHeader =
+ httpRequest.headers().firstEntry(RESTUtil.IDEMPOTENCY_KEY_HEADER);
+ if (keyHeader.isEmpty()) {
+ return action.get();
+ }
+
+ String key = keyHeader.get().value();
+
+ AtomicBoolean isLeader = new AtomicBoolean(false);
Review Comment:
“Leader” here means the request thread that won the
IDEMPOTENCY_STORE.compute(...) and created (or replaced) the IN_PROGRESS entry
for this Idempotency-Key. That thread executes the action and finalizes the
entry; all other concurrent requests for the same key are “followers” that wait
on the latch and then replay the finalized result/error.
##########
core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java:
##########
@@ -3312,6 +3319,164 @@ public void
testClientDoesNotSendIdempotencyWhenServerNotAdvertising() {
local.dropTable(ident);
}
+ @Test
+ public void testIdempotentDuplicateCreateReturnsCached() {
+ String key = "dup-create-key";
+ Namespace ns = Namespace.of("ns_dup");
+ Pair<TableIdentifier, Pair<RESTClient, Map<String, String>>> env =
+ prepareIdempotentEnv(key, ns, "t_dup");
+ TableIdentifier ident = env.first();
+ Pair<RESTClient, Map<String, String>> httpAndHeaders = env.second();
+ RESTClient http = httpAndHeaders.first();
+ Map<String, String> headers = httpAndHeaders.second();
+ CreateTableRequest req = createReq(ident);
+
+ // First create succeeds
+ LoadTableResponse first =
+ http.post(
+ ResourcePaths.forCatalogProperties(ImmutableMap.of()).tables(ns),
+ req,
+ LoadTableResponse.class,
+ headers,
+ ErrorHandlers.tableErrorHandler());
+ assertThat(first).isNotNull();
+
+ // Verify request shape (method, path, headers including Idempotency-Key)
+ verifyCreatePost(ns, headers);
+
+ // Duplicate with same key returns cached 200 OK
+ LoadTableResponse second =
+ http.post(
+ ResourcePaths.forCatalogProperties(ImmutableMap.of()).tables(ns),
+ req,
+ LoadTableResponse.class,
+ headers,
+ ErrorHandlers.tableErrorHandler());
+ assertThat(second).isNotNull();
+
+ // Clean up
+ restCatalog.dropTable(ident);
+ }
+
+ @Test
+ public void testIdempotencyKeyLifetimeExpiredTreatsAsNew() {
+ // Set TTL to 0 so cached success expires immediately
+ CatalogHandlers.setIdempotencyLifetimeFromIso("PT0S");
+ try {
+ String key = "expired-create-key";
+ Namespace ns = Namespace.of("ns_exp");
+ Pair<TableIdentifier, Pair<RESTClient, Map<String, String>>> env =
+ prepareIdempotentEnv(key, ns, "t_exp");
+ TableIdentifier ident = env.first();
+ Pair<RESTClient, Map<String, String>> httpAndHeaders = env.second();
+ RESTClient http = httpAndHeaders.first();
+ Map<String, String> headers = httpAndHeaders.second();
+ CreateTableRequest req = createReq(ident);
+
+ // First create succeeds
+ LoadTableResponse created =
+ http.post(
+ ResourcePaths.forCatalogProperties(ImmutableMap.of()).tables(ns),
+ req,
+ LoadTableResponse.class,
+ headers,
+ ErrorHandlers.tableErrorHandler());
+ assertThat(created).isNotNull();
+
+ // Verify request shape (method, path, headers including Idempotency-Key)
+ verifyCreatePost(ns, headers);
+
+ // TTL expired -> duplicate with same key should be treated as new and
fail with AlreadyExists
+ assertThatThrownBy(
+ () ->
+ http.post(
+
ResourcePaths.forCatalogProperties(ImmutableMap.of()).tables(ns),
+ req,
+ LoadTableResponse.class,
+ headers,
+ ErrorHandlers.tableErrorHandler()))
+ .isInstanceOf(AlreadyExistsException.class)
+ .hasMessageContaining(ident.toString());
+
+ // Clean up
+ restCatalog.dropTable(ident);
Review Comment:
Good catch. This is not needed. Removed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]