This is an automated email from the ASF dual-hosted git repository.
yufei pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git
The following commit(s) were added to refs/heads/main by this push:
new d8d87a5ad JDBC: add DB-agnostic idempotency store + model (#3584)
d8d87a5ad is described below
commit d8d87a5ad6e3e1f216fa8d474c32e5555da88c01
Author: Huaxin Gao <[email protected]>
AuthorDate: Wed Feb 11 16:06:47 2026 -0800
JDBC: add DB-agnostic idempotency store + model (#3584)
---
persistence/relational-jdbc/build.gradle.kts | 3 +
.../relational/jdbc/QueryGenerator.java | 117 +++++++++-
.../jdbc/idempotency/PostgresIdempotencyStore.java | 255 ---------------------
.../RelationalJdbcIdempotencyStore.java | 250 ++++++++++++++++++++
.../jdbc/models/ModelIdempotencyRecord.java | 199 ++++++++++++++++
.../relational/jdbc/QueryGeneratorTest.java | 127 ++++++++++
... RelationalJdbcIdempotencyStorePostgresIT.java} | 25 +-
.../entity}/IdempotencyRecord.java | 52 ++++-
.../IdempotencyPersistenceException.java | 37 +++
.../polaris/core/persistence/IdempotencyStore.java | 2 +-
10 files changed, 794 insertions(+), 273 deletions(-)
diff --git a/persistence/relational-jdbc/build.gradle.kts
b/persistence/relational-jdbc/build.gradle.kts
index c3e425372..10cb2e8fe 100644
--- a/persistence/relational-jdbc/build.gradle.kts
+++ b/persistence/relational-jdbc/build.gradle.kts
@@ -47,4 +47,7 @@ dependencies {
testImplementation("org.testcontainers:testcontainers-junit-jupiter")
testImplementation("org.testcontainers:testcontainers-postgresql")
+
+ testImplementation(project(":polaris-container-spec-helper"))
+ testImplementation(project(":polaris-runtime-test-common"))
}
diff --git
a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/QueryGenerator.java
b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/QueryGenerator.java
index 485956ed8..a8720fda5 100644
---
a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/QueryGenerator.java
+++
b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/QueryGenerator.java
@@ -192,6 +192,60 @@ public class QueryGenerator {
return new PreparedQuery(sql, bindingParams);
}
+ /**
+ * Builds an UPDATE query that updates only the specified columns and
supports richer WHERE
+ * predicates (equality, greater-than, less-than, IS NULL, IS NOT NULL).
+ *
+ * <p>Callers should prefer passing an ordered map (e.g. {@link
java.util.LinkedHashMap}) for the
+ * set clause so generated SQL and parameter order are consistent.
+ *
+ * @param tableColumns List of valid table columns.
+ * @param tableName Target table.
+ * @param setClause Column-value pairs to update.
+ * @param whereEquals Column-value pairs used in WHERE equality filtering.
+ * @param whereGreater Column-value pairs used in WHERE greater-than
filtering.
+ * @param whereLess Column-value pairs used in WHERE less-than filtering.
+ * @param whereIsNull Columns that must be NULL.
+ * @param whereIsNotNull Columns that must be NOT NULL.
+ * @return UPDATE query with parameter bindings.
+ */
+ public static PreparedQuery generateUpdateQuery(
+ @Nonnull List<String> tableColumns,
+ @Nonnull String tableName,
+ @Nonnull Map<String, Object> setClause,
+ @Nonnull Map<String, Object> whereEquals,
+ @Nonnull Map<String, Object> whereGreater,
+ @Nonnull Map<String, Object> whereLess,
+ @Nonnull Set<String> whereIsNull,
+ @Nonnull Set<String> whereIsNotNull) {
+ if (setClause.isEmpty()) {
+ throw new IllegalArgumentException("Empty setClause");
+ }
+
+ Set<String> columns = new HashSet<>(tableColumns);
+ validateColumns(columns, setClause.keySet());
+
+ QueryFragment where =
+ generateWhereClauseExtended(
+ columns, whereEquals, whereGreater, whereLess, whereIsNull,
whereIsNotNull);
+
+ List<String> setParts = new ArrayList<>();
+ List<Object> params = new ArrayList<>();
+ for (Map.Entry<String, Object> entry : setClause.entrySet()) {
+ setParts.add(entry.getKey() + " = ?");
+ params.add(entry.getValue());
+ }
+ params.addAll(where.parameters());
+
+ String sql =
+ "UPDATE "
+ + getFullyQualifiedTableName(tableName)
+ + " SET "
+ + String.join(", ", setParts)
+ + where.sql();
+ return new PreparedQuery(sql, params);
+ }
+
/**
* Builds a DELETE query with the given conditions.
*
@@ -209,6 +263,26 @@ public class QueryGenerator {
"DELETE FROM " + getFullyQualifiedTableName(tableName) + where.sql(),
where.parameters());
}
+ /**
+ * Builds a DELETE query that supports richer WHERE predicates (equality,
greater-than, less-than,
+ * IS NULL, IS NOT NULL).
+ */
+ public static PreparedQuery generateDeleteQuery(
+ @Nonnull List<String> tableColumns,
+ @Nonnull String tableName,
+ @Nonnull Map<String, Object> whereEquals,
+ @Nonnull Map<String, Object> whereGreater,
+ @Nonnull Map<String, Object> whereLess,
+ @Nonnull Set<String> whereIsNull,
+ @Nonnull Set<String> whereIsNotNull) {
+ Set<String> columns = new HashSet<>(tableColumns);
+ QueryFragment where =
+ generateWhereClauseExtended(
+ columns, whereEquals, whereGreater, whereLess, whereIsNull,
whereIsNotNull);
+ return new PreparedQuery(
+ "DELETE FROM " + getFullyQualifiedTableName(tableName) + where.sql(),
where.parameters());
+ }
+
private static PreparedQuery generateSelectQuery(
@Nonnull List<String> columnNames,
@Nonnull String tableName,
@@ -231,22 +305,53 @@ public class QueryGenerator {
@Nonnull Set<String> tableColumns,
@Nonnull Map<String, Object> whereEquals,
@Nonnull Map<String, Object> whereGreater) {
+ return generateWhereClauseExtended(
+ tableColumns, whereEquals, whereGreater, Map.of(), Set.of(), Set.of());
+ }
+
+ private static void validateColumns(
+ @Nonnull Set<String> tableColumns, @Nonnull Set<String> columns) {
+ for (String column : columns) {
+ if (!tableColumns.contains(column) && !column.equals("realm_id")) {
+ throw new IllegalArgumentException("Invalid query column: " + column);
+ }
+ }
+ }
+
+ @VisibleForTesting
+ static QueryFragment generateWhereClauseExtended(
+ @Nonnull Set<String> tableColumns,
+ @Nonnull Map<String, Object> whereEquals,
+ @Nonnull Map<String, Object> whereGreater,
+ @Nonnull Map<String, Object> whereLess,
+ @Nonnull Set<String> whereIsNull,
+ @Nonnull Set<String> whereIsNotNull) {
+ validateColumns(tableColumns, whereEquals.keySet());
+ validateColumns(tableColumns, whereGreater.keySet());
+ validateColumns(tableColumns, whereLess.keySet());
+ validateColumns(tableColumns, whereIsNull);
+ validateColumns(tableColumns, whereIsNotNull);
+
List<String> conditions = new ArrayList<>();
List<Object> parameters = new ArrayList<>();
for (Map.Entry<String, Object> entry : whereEquals.entrySet()) {
- if (!tableColumns.contains(entry.getKey()) &&
!entry.getKey().equals("realm_id")) {
- throw new IllegalArgumentException("Invalid query column: " +
entry.getKey());
- }
conditions.add(entry.getKey() + " = ?");
parameters.add(entry.getValue());
}
for (Map.Entry<String, Object> entry : whereGreater.entrySet()) {
- if (!tableColumns.contains(entry.getKey()) &&
!entry.getKey().equals("realm_id")) {
- throw new IllegalArgumentException("Invalid query column: " +
entry.getKey());
- }
conditions.add(entry.getKey() + " > ?");
parameters.add(entry.getValue());
}
+ for (Map.Entry<String, Object> entry : whereLess.entrySet()) {
+ conditions.add(entry.getKey() + " < ?");
+ parameters.add(entry.getValue());
+ }
+ for (String column : whereIsNull) {
+ conditions.add(column + " IS NULL");
+ }
+ for (String column : whereIsNotNull) {
+ conditions.add(column + " IS NOT NULL");
+ }
String clause = conditions.isEmpty() ? "" : " WHERE " + String.join(" AND
", conditions);
return new QueryFragment(clause, parameters);
}
diff --git
a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/PostgresIdempotencyStore.java
b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/PostgresIdempotencyStore.java
deleted file mode 100644
index ff826e19e..000000000
---
a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/PostgresIdempotencyStore.java
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.polaris.persistence.relational.jdbc.idempotency;
-
-import jakarta.annotation.Nonnull;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Timestamp;
-import java.time.Instant;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import javax.sql.DataSource;
-import org.apache.polaris.core.persistence.IdempotencyStore;
-import org.apache.polaris.core.persistence.IdempotencyStore.HeartbeatResult;
-import org.apache.polaris.idempotency.IdempotencyRecord;
-import org.apache.polaris.persistence.relational.jdbc.DatabaseType;
-import org.apache.polaris.persistence.relational.jdbc.DatasourceOperations;
-import org.apache.polaris.persistence.relational.jdbc.QueryGenerator;
-import
org.apache.polaris.persistence.relational.jdbc.RelationalJdbcConfiguration;
-import org.apache.polaris.persistence.relational.jdbc.models.Converter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/** Postgres implementation of IdempotencyStore. */
-public final class PostgresIdempotencyStore implements IdempotencyStore {
- private static final Logger LOG =
LoggerFactory.getLogger(PostgresIdempotencyStore.class);
-
- private static final String TABLE = "POLARIS_SCHEMA.idempotency_records";
-
- private final DatasourceOperations ops;
-
- public PostgresIdempotencyStore(
- @Nonnull DataSource dataSource, @Nonnull RelationalJdbcConfiguration cfg)
- throws SQLException {
- this.ops = new DatasourceOperations(dataSource, cfg);
- }
-
- @Override
- public ReserveResult reserve(
- String realmId,
- String idempotencyKey,
- String operationType,
- String normalizedResourceId,
- Instant expiresAt,
- String executorId,
- Instant now) {
- String sql =
- "INSERT INTO "
- + TABLE
- + " (realm_id, idempotency_key, operation_type, resource_id,"
- + " http_status, error_subtype, response_summary,
response_headers, finalized_at,"
- + " created_at, updated_at, heartbeat_at, executor_id, expires_at)"
- + " VALUES (?, ?, ?, ?, NULL, NULL, NULL, NULL, NULL, ?, ?, ?, ?,
?)"
- + " ON CONFLICT (realm_id, idempotency_key) DO NOTHING";
- List<Object> params =
- List.of(
- realmId,
- idempotencyKey,
- operationType,
- normalizedResourceId,
- Timestamp.from(now),
- Timestamp.from(now),
- Timestamp.from(now),
- executorId,
- Timestamp.from(expiresAt));
- try {
- int updated = ops.executeUpdate(new QueryGenerator.PreparedQuery(sql,
params));
- if (updated == 1) {
- return new ReserveResult(ReserveResultType.OWNED, Optional.empty());
- } else {
- // Load existing to return to caller
- return new ReserveResult(ReserveResultType.DUPLICATE, load(realmId,
idempotencyKey));
- }
- } catch (SQLException e) {
- throw new RuntimeException("Failed to reserve idempotency key", e);
- }
- }
-
- @Override
- public Optional<IdempotencyRecord> load(String realmId, String
idempotencyKey) {
- String sql =
- "SELECT realm_id, idempotency_key, operation_type, resource_id,
http_status, error_subtype,"
- + " response_summary, response_headers, created_at, updated_at,
finalized_at, heartbeat_at,"
- + " executor_id, expires_at"
- + " FROM "
- + TABLE
- + " WHERE realm_id = ? AND idempotency_key = ?";
- try {
- final IdempotencyRecord[] holder = new IdempotencyRecord[1];
- ops.executeSelectOverStream(
- new QueryGenerator.PreparedQuery(sql, List.of(realmId,
idempotencyKey)),
- new Converter<IdempotencyRecord>() {
- @Override
- public IdempotencyRecord fromResultSet(ResultSet rs) throws
SQLException {
- return convert(rs);
- }
-
- @Override
- public Map<String, Object> toMap(DatabaseType databaseType) {
- return Map.of();
- }
- },
- stream -> stream.findFirst().ifPresent(r -> holder[0] = r));
- return Optional.ofNullable(holder[0]);
- } catch (SQLException e) {
- throw new RuntimeException("Failed to load idempotency record", e);
- }
- }
-
- @Override
- public HeartbeatResult updateHeartbeat(
- String realmId, String idempotencyKey, String executorId, Instant now) {
- String sql =
- "UPDATE "
- + TABLE
- + " SET heartbeat_at = ?, updated_at = ?"
- + " WHERE realm_id = ? AND idempotency_key = ?"
- + " AND http_status IS NULL"
- + " AND (executor_id IS NULL OR executor_id = ?)";
- try {
- int rows =
- ops.executeUpdate(
- new QueryGenerator.PreparedQuery(
- sql,
- List.of(
- Timestamp.from(now),
- Timestamp.from(now),
- realmId,
- idempotencyKey,
- executorId)));
- if (rows > 0) {
- return HeartbeatResult.UPDATED;
- }
-
- // No rows updated: determine why by loading the current record, if any.
- // TODO: consider using a single atomic read/write (for example,
PostgreSQL
- // UPDATE ... RETURNING) to avoid this follow-up lookup and make the
- // conflicting state observable in the same operation.
- Optional<IdempotencyRecord> existing = load(realmId, idempotencyKey);
- if (existing.isEmpty()) {
- return HeartbeatResult.NOT_FOUND;
- }
-
- IdempotencyRecord record = existing.get();
- if (record.getHttpStatus() != null) {
- return HeartbeatResult.FINALIZED;
- }
-
- // Record is still IN_PROGRESS but owned by a different executor.
- return HeartbeatResult.LOST_OWNERSHIP;
- } catch (SQLException e) {
- throw new RuntimeException("Failed to update heartbeat", e);
- }
- }
-
- @Override
- public boolean finalizeRecord(
- String realmId,
- String idempotencyKey,
- Integer httpStatus,
- String errorSubtype,
- String responseSummary,
- String responseHeaders,
- Instant finalizedAt) {
- String sql =
- "UPDATE "
- + TABLE
- + " SET http_status = ?, error_subtype = ?, response_summary = ?,
response_headers = ?,"
- + " finalized_at = ?, updated_at = ?"
- + " WHERE realm_id = ? AND idempotency_key = ? AND http_status IS
NULL";
- try {
- int rows =
- ops.executeUpdate(
- new QueryGenerator.PreparedQuery(
- sql,
- Arrays.asList(
- httpStatus,
- errorSubtype,
- responseSummary,
- responseHeaders,
- Timestamp.from(finalizedAt),
- Timestamp.from(finalizedAt),
- realmId,
- idempotencyKey)));
- return rows > 0;
- } catch (SQLException e) {
- throw new RuntimeException("Failed to finalize idempotency record", e);
- }
- }
-
- @Override
- public int purgeExpired(String realmId, Instant before) {
- String sql = "DELETE FROM " + TABLE + " WHERE realm_id = ? AND expires_at
< ?";
- try {
- return ops.executeUpdate(
- new QueryGenerator.PreparedQuery(sql, List.of(realmId,
Timestamp.from(before))));
- } catch (SQLException e) {
- throw new RuntimeException("Failed to purge expired idempotency
records", e);
- }
- }
-
- private static IdempotencyRecord convert(ResultSet rs) {
- try {
- String realmId = rs.getString("realm_id");
- String idempotencyKey = rs.getString("idempotency_key");
- String operationType = rs.getString("operation_type");
- String resourceId = rs.getString("resource_id");
- Integer httpStatus = (Integer) rs.getObject("http_status");
- String errorSubtype = rs.getString("error_subtype");
- String responseSummary = rs.getString("response_summary");
- String responseHeaders = rs.getString("response_headers");
- Instant createdAt = rs.getTimestamp("created_at").toInstant();
- Instant updatedAt = rs.getTimestamp("updated_at").toInstant();
- Timestamp fts = rs.getTimestamp("finalized_at");
- Instant finalizedAt = fts == null ? null : fts.toInstant();
- Timestamp hb = rs.getTimestamp("heartbeat_at");
- Instant heartbeatAt = hb == null ? null : hb.toInstant();
- String executorId = rs.getString("executor_id");
- Instant expiresAt = rs.getTimestamp("expires_at").toInstant();
- return new IdempotencyRecord(
- realmId,
- idempotencyKey,
- operationType,
- resourceId,
- httpStatus,
- errorSubtype,
- responseSummary,
- responseHeaders,
- createdAt,
- updatedAt,
- finalizedAt,
- heartbeatAt,
- executorId,
- expiresAt);
- } catch (SQLException e) {
- throw new RuntimeException(e);
- }
- }
-}
diff --git
a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStore.java
b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStore.java
new file mode 100644
index 000000000..6f3143aef
--- /dev/null
+++
b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStore.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.polaris.persistence.relational.jdbc.idempotency;
+
+import jakarta.annotation.Nonnull;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import javax.sql.DataSource;
+import org.apache.polaris.core.entity.IdempotencyRecord;
+import org.apache.polaris.core.persistence.IdempotencyPersistenceException;
+import org.apache.polaris.core.persistence.IdempotencyStore;
+import org.apache.polaris.persistence.relational.jdbc.DatasourceOperations;
+import org.apache.polaris.persistence.relational.jdbc.QueryGenerator;
+import
org.apache.polaris.persistence.relational.jdbc.RelationalJdbcConfiguration;
+import org.apache.polaris.persistence.relational.jdbc.models.Converter;
+import
org.apache.polaris.persistence.relational.jdbc.models.ModelIdempotencyRecord;
+
+public class RelationalJdbcIdempotencyStore implements IdempotencyStore {
+
+ private final DatasourceOperations datasourceOperations;
+
+ public RelationalJdbcIdempotencyStore(
+ @Nonnull DataSource dataSource, @Nonnull RelationalJdbcConfiguration cfg)
+ throws SQLException {
+ this.datasourceOperations = new DatasourceOperations(dataSource, cfg);
+ }
+
+ @Override
+ public ReserveResult reserve(
+ String realmId,
+ String idempotencyKey,
+ String operationType,
+ String normalizedResourceId,
+ Instant expiresAt,
+ String executorId,
+ Instant now) {
+ try {
+ // Build insert values directly to avoid requiring an
Immutables-generated model type.
+ Map<String, Object> insertMap = new LinkedHashMap<>();
+ insertMap.put(ModelIdempotencyRecord.IDEMPOTENCY_KEY, idempotencyKey);
+ insertMap.put(ModelIdempotencyRecord.OPERATION_TYPE, operationType);
+ insertMap.put(ModelIdempotencyRecord.RESOURCE_ID, normalizedResourceId);
+ insertMap.put(ModelIdempotencyRecord.HTTP_STATUS, null);
+ insertMap.put(ModelIdempotencyRecord.ERROR_SUBTYPE, null);
+ insertMap.put(ModelIdempotencyRecord.RESPONSE_SUMMARY, null);
+ insertMap.put(ModelIdempotencyRecord.RESPONSE_HEADERS, null);
+ insertMap.put(ModelIdempotencyRecord.FINALIZED_AT, null);
+ insertMap.put(ModelIdempotencyRecord.CREATED_AT, Timestamp.from(now));
+ insertMap.put(ModelIdempotencyRecord.UPDATED_AT, Timestamp.from(now));
+ insertMap.put(ModelIdempotencyRecord.HEARTBEAT_AT, Timestamp.from(now));
+ insertMap.put(ModelIdempotencyRecord.EXECUTOR_ID, executorId);
+ insertMap.put(ModelIdempotencyRecord.EXPIRES_AT,
Timestamp.from(expiresAt));
+
+ List<Object> values = insertMap.values().stream().toList();
+ QueryGenerator.PreparedQuery insert =
+ QueryGenerator.generateInsertQuery(
+ ModelIdempotencyRecord.ALL_COLUMNS,
+ ModelIdempotencyRecord.TABLE_NAME,
+ values,
+ realmId);
+ datasourceOperations.executeUpdate(insert);
+ return new ReserveResult(ReserveResultType.OWNED, Optional.empty());
+ } catch (SQLException e) {
+ if (datasourceOperations.isConstraintViolation(e)) {
+ return new ReserveResult(ReserveResultType.DUPLICATE, load(realmId,
idempotencyKey));
+ }
+ throw new IdempotencyPersistenceException("Failed to reserve idempotency
key", e);
+ }
+ }
+
+ @Override
+ public Optional<IdempotencyRecord> load(String realmId, String
idempotencyKey) {
+ try {
+ QueryGenerator.PreparedQuery query =
+ QueryGenerator.generateSelectQuery(
+ ModelIdempotencyRecord.ALL_COLUMNS,
+ ModelIdempotencyRecord.TABLE_NAME,
+ Map.of(
+ ModelIdempotencyRecord.REALM_ID,
+ realmId,
+ ModelIdempotencyRecord.IDEMPOTENCY_KEY,
+ idempotencyKey));
+ List<IdempotencyRecord> results =
+ datasourceOperations.executeSelect(
+ query,
+ new Converter<>() {
+ @Override
+ public IdempotencyRecord fromResultSet(ResultSet rs) throws
SQLException {
+ return ModelIdempotencyRecord.fromRow(realmId, rs);
+ }
+
+ @Override
+ public Map<String, Object> toMap(
+
org.apache.polaris.persistence.relational.jdbc.DatabaseType databaseType) {
+ throw new UnsupportedOperationException("Not used for SELECT
conversion");
+ }
+ });
+ if (results.isEmpty()) {
+ return Optional.empty();
+ }
+ if (results.size() > 1) {
+ throw new IllegalStateException(
+ "More than one idempotency record found for realm/key: "
+ + realmId
+ + "/"
+ + idempotencyKey);
+ }
+ return Optional.of(results.getFirst());
+ } catch (SQLException e) {
+ throw new IdempotencyPersistenceException("Failed to load idempotency
record", e);
+ }
+ }
+
+ @Override
+ public HeartbeatResult updateHeartbeat(
+ String realmId, String idempotencyKey, String executorId, Instant now) {
+ Optional<IdempotencyRecord> existing = load(realmId, idempotencyKey);
+ if (existing.isEmpty()) {
+ return HeartbeatResult.NOT_FOUND;
+ }
+
+ IdempotencyRecord record = existing.get();
+ if (record.getHttpStatus() != null) {
+ return HeartbeatResult.FINALIZED;
+ }
+ if (record.getExecutorId() == null ||
!record.getExecutorId().equals(executorId)) {
+ return HeartbeatResult.LOST_OWNERSHIP;
+ }
+
+ QueryGenerator.PreparedQuery update =
+ QueryGenerator.generateUpdateQuery(
+ ModelIdempotencyRecord.ALL_COLUMNS,
+ ModelIdempotencyRecord.TABLE_NAME,
+ Map.of(
+ ModelIdempotencyRecord.HEARTBEAT_AT,
+ Timestamp.from(now),
+ ModelIdempotencyRecord.UPDATED_AT,
+ Timestamp.from(now)),
+ Map.of(
+ ModelIdempotencyRecord.REALM_ID,
+ realmId,
+ ModelIdempotencyRecord.IDEMPOTENCY_KEY,
+ idempotencyKey,
+ ModelIdempotencyRecord.EXECUTOR_ID,
+ executorId),
+ Map.of(),
+ Map.of(),
+ Set.of(ModelIdempotencyRecord.HTTP_STATUS),
+ Set.of());
+
+ try {
+ int updated = datasourceOperations.executeUpdate(update);
+ if (updated > 0) {
+ return HeartbeatResult.UPDATED;
+ }
+ } catch (SQLException e) {
+ throw new IdempotencyPersistenceException("Failed to update idempotency
heartbeat", e);
+ }
+
+ // Raced with finalize/ownership loss; re-check to return a meaningful
result.
+ Optional<IdempotencyRecord> after = load(realmId, idempotencyKey);
+ if (after.isEmpty()) {
+ return HeartbeatResult.NOT_FOUND;
+ }
+ if (after.get().getHttpStatus() != null) {
+ return HeartbeatResult.FINALIZED;
+ }
+ return HeartbeatResult.LOST_OWNERSHIP;
+ }
+
+ @Override
+ public boolean finalizeRecord(
+ String realmId,
+ String idempotencyKey,
+ Integer httpStatus,
+ String errorSubtype,
+ String responseSummary,
+ String responseHeaders,
+ Instant finalizedAt) {
+ // Use ordered/set maps so we can include nullable values (Map.of
disallows nulls).
+ Map<String, Object> setClause = new LinkedHashMap<>();
+ setClause.put(ModelIdempotencyRecord.HTTP_STATUS, httpStatus);
+ setClause.put(ModelIdempotencyRecord.ERROR_SUBTYPE, errorSubtype);
+ setClause.put(ModelIdempotencyRecord.RESPONSE_SUMMARY, responseSummary);
+ setClause.put(ModelIdempotencyRecord.RESPONSE_HEADERS, responseHeaders);
+ setClause.put(ModelIdempotencyRecord.FINALIZED_AT,
Timestamp.from(finalizedAt));
+ setClause.put(ModelIdempotencyRecord.UPDATED_AT,
Timestamp.from(finalizedAt));
+
+ Map<String, Object> whereEquals = new HashMap<>();
+ whereEquals.put(ModelIdempotencyRecord.REALM_ID, realmId);
+ whereEquals.put(ModelIdempotencyRecord.IDEMPOTENCY_KEY, idempotencyKey);
+
+ QueryGenerator.PreparedQuery update =
+ QueryGenerator.generateUpdateQuery(
+ ModelIdempotencyRecord.ALL_COLUMNS,
+ ModelIdempotencyRecord.TABLE_NAME,
+ setClause,
+ whereEquals,
+ Map.of(),
+ Map.of(),
+ Set.of(ModelIdempotencyRecord.HTTP_STATUS),
+ Set.of());
+
+ try {
+ return datasourceOperations.executeUpdate(update) > 0;
+ } catch (SQLException e) {
+ throw new IdempotencyPersistenceException("Failed to finalize
idempotency record", e);
+ }
+ }
+
+ @Override
+ public int purgeExpired(String realmId, Instant before) {
+ try {
+ QueryGenerator.PreparedQuery delete =
+ QueryGenerator.generateDeleteQuery(
+ ModelIdempotencyRecord.ALL_COLUMNS,
+ ModelIdempotencyRecord.TABLE_NAME,
+ Map.of(ModelIdempotencyRecord.REALM_ID, realmId),
+ Map.of(),
+ Map.of(ModelIdempotencyRecord.EXPIRES_AT,
Timestamp.from(before)),
+ Set.of(),
+ Set.of(ModelIdempotencyRecord.EXPIRES_AT));
+ return datasourceOperations.executeUpdate(delete);
+ } catch (SQLException e) {
+ throw new IdempotencyPersistenceException("Failed to purge expired
idempotency records", e);
+ }
+ }
+}
diff --git
a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelIdempotencyRecord.java
b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelIdempotencyRecord.java
new file mode 100644
index 000000000..74473cc24
--- /dev/null
+++
b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelIdempotencyRecord.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.persistence.relational.jdbc.models;
+
+import jakarta.annotation.Nullable;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.polaris.core.entity.IdempotencyRecord;
+import org.apache.polaris.persistence.relational.jdbc.DatabaseType;
+
+/**
+ * JDBC model for {@link IdempotencyRecord} mirroring the {@code
idempotency_records} table.
+ *
+ * <p>This follows the same pattern as {@link ModelEvent}, separating the
storage representation
+ * from the core domain model while still providing {@link Converter} helpers.
+ *
+ * <p>Note: {@code realm_id} is treated as an implicit column across
relational-jdbc: callers can
+ * filter on it in WHERE clauses even if it is not included in the projection
list.
+ */
+public interface ModelIdempotencyRecord extends Converter<IdempotencyRecord> {
+
+ String TABLE_NAME = "idempotency_records";
+
+ // Logical tenant / realm identifier.
+ String REALM_ID = "realm_id";
+ // Client-provided idempotency key.
+ String IDEMPOTENCY_KEY = "idempotency_key";
+ // Logical operation type (e.g. commit-table).
+ String OPERATION_TYPE = "operation_type";
+ // Normalized identifier of the affected resource.
+ String RESOURCE_ID = "resource_id";
+
+ // Final HTTP status code once the operation is completed (null while
in-progress).
+ String HTTP_STATUS = "http_status";
+ // Optional error subtype for failures.
+ String ERROR_SUBTYPE = "error_subtype";
+ // Short serialized representation of the response body.
+ String RESPONSE_SUMMARY = "response_summary";
+ // Serialized representation of response headers.
+ String RESPONSE_HEADERS = "response_headers";
+ // Timestamp when the operation was finalized (null while in-progress).
+ String FINALIZED_AT = "finalized_at";
+
+ // Timestamp when the record was created.
+ String CREATED_AT = "created_at";
+ // Timestamp when the record was last updated.
+ String UPDATED_AT = "updated_at";
+ // Timestamp for the last heartbeat update (null if no heartbeat recorded).
+ String HEARTBEAT_AT = "heartbeat_at";
+ // Identifier of the executor that owns the in-progress record (null if not
owned).
+ String EXECUTOR_ID = "executor_id";
+ // Expiration timestamp after which the record can be considered
stale/purgeable.
+ String EXPIRES_AT = "expires_at";
+
+ List<String> ALL_COLUMNS =
+ List.of(
+ IDEMPOTENCY_KEY,
+ OPERATION_TYPE,
+ RESOURCE_ID,
+ HTTP_STATUS,
+ ERROR_SUBTYPE,
+ RESPONSE_SUMMARY,
+ RESPONSE_HEADERS,
+ FINALIZED_AT,
+ CREATED_AT,
+ UPDATED_AT,
+ HEARTBEAT_AT,
+ EXECUTOR_ID,
+ EXPIRES_AT);
+
+ String getRealmId();
+
+ String getIdempotencyKey();
+
+ String getOperationType();
+
+ String getResourceId();
+
+ @Nullable
+ Integer getHttpStatus();
+
+ @Nullable
+ String getErrorSubtype();
+
+ @Nullable
+ String getResponseSummary();
+
+ @Nullable
+ String getResponseHeaders();
+
+ @Nullable
+ Instant getFinalizedAt();
+
+ Instant getCreatedAt();
+
+ Instant getUpdatedAt();
+
+ @Nullable
+ Instant getHeartbeatAt();
+
+ @Nullable
+ String getExecutorId();
+
+ Instant getExpiresAt();
+
+ @Override
+ default IdempotencyRecord fromResultSet(ResultSet rs) throws SQLException {
+ return fromRow(rs);
+ }
+
+ /** Convert the current ResultSet row into an {@link IdempotencyRecord}. */
+ static IdempotencyRecord fromRow(ResultSet rs) throws SQLException {
+ // Requires realm_id to be projected in the ResultSet.
+ return fromRow(rs.getString(REALM_ID), rs);
+ }
+
+ /**
+ * Convert the current ResultSet row into an {@link IdempotencyRecord},
using {@code realmId} from
+ * call context (so callers can project only {@link #ALL_COLUMNS}).
+ */
+ static IdempotencyRecord fromRow(String realmId, ResultSet rs) throws
SQLException {
+ String idempotencyKey = rs.getString(IDEMPOTENCY_KEY);
+ String operationType = rs.getString(OPERATION_TYPE);
+ String resourceId = rs.getString(RESOURCE_ID);
+
+ Integer httpStatus = (Integer) rs.getObject(HTTP_STATUS);
+ String errorSubtype = rs.getString(ERROR_SUBTYPE);
+ String responseSummary = rs.getString(RESPONSE_SUMMARY);
+ String responseHeaders = rs.getString(RESPONSE_HEADERS);
+
+ Instant createdAt = rs.getTimestamp(CREATED_AT).toInstant();
+ Instant updatedAt = rs.getTimestamp(UPDATED_AT).toInstant();
+
+ Timestamp finalizedTs = rs.getTimestamp(FINALIZED_AT);
+ Instant finalizedAt = finalizedTs == null ? null : finalizedTs.toInstant();
+
+ Timestamp heartbeatTs = rs.getTimestamp(HEARTBEAT_AT);
+ Instant heartbeatAt = heartbeatTs == null ? null : heartbeatTs.toInstant();
+
+ String executorId = rs.getString(EXECUTOR_ID);
+ Instant expiresAt = rs.getTimestamp(EXPIRES_AT).toInstant();
+
+ return new IdempotencyRecord(
+ realmId,
+ idempotencyKey,
+ operationType,
+ resourceId,
+ httpStatus,
+ errorSubtype,
+ responseSummary,
+ responseHeaders,
+ createdAt,
+ updatedAt,
+ finalizedAt,
+ heartbeatAt,
+ executorId,
+ expiresAt);
+ }
+
+ @Override
+ default Map<String, Object> toMap(DatabaseType databaseType) {
+ Map<String, Object> map = new LinkedHashMap<>();
+ map.put(IDEMPOTENCY_KEY, getIdempotencyKey());
+ map.put(OPERATION_TYPE, getOperationType());
+ map.put(RESOURCE_ID, getResourceId());
+ map.put(HTTP_STATUS, getHttpStatus());
+ map.put(ERROR_SUBTYPE, getErrorSubtype());
+ map.put(RESPONSE_SUMMARY, getResponseSummary());
+ map.put(RESPONSE_HEADERS, getResponseHeaders());
+ map.put(FINALIZED_AT, getFinalizedAt() == null ? null :
Timestamp.from(getFinalizedAt()));
+ map.put(CREATED_AT, Timestamp.from(getCreatedAt()));
+ map.put(UPDATED_AT, Timestamp.from(getUpdatedAt()));
+ map.put(HEARTBEAT_AT, getHeartbeatAt() == null ? null :
Timestamp.from(getHeartbeatAt()));
+ map.put(EXECUTOR_ID, getExecutorId());
+ map.put(EXPIRES_AT, Timestamp.from(getExpiresAt()));
+ return map;
+ }
+}
diff --git
a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/QueryGeneratorTest.java
b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/QueryGeneratorTest.java
index d31b07788..38a06f2b5 100644
---
a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/QueryGeneratorTest.java
+++
b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/QueryGeneratorTest.java
@@ -26,6 +26,8 @@ import static org.mockito.Mockito.when;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -220,6 +222,131 @@ public class QueryGeneratorTest {
assertEquals("", QueryGenerator.generateWhereClause(Set.of(), whereClause,
Map.of()).sql());
}
+ @Test
+ void testGenerateWhereClauseExtended_allPredicatesAndStableParameterOrder() {
+ Map<String, Object> whereEquals = new LinkedHashMap<>();
+ whereEquals.put("a", "A");
+ Map<String, Object> whereGreater = new LinkedHashMap<>();
+ whereGreater.put("b", 2);
+ Map<String, Object> whereLess = new LinkedHashMap<>();
+ whereLess.put("c", 3);
+
+ Set<String> whereIsNull = new LinkedHashSet<>(List.of("d"));
+ Set<String> whereIsNotNull = new LinkedHashSet<>(List.of("e"));
+
+ QueryGenerator.QueryFragment where =
+ QueryGenerator.generateWhereClauseExtended(
+ Set.of("a", "b", "c", "d", "e"),
+ whereEquals,
+ whereGreater,
+ whereLess,
+ whereIsNull,
+ whereIsNotNull);
+
+ assertEquals(" WHERE a = ? AND b > ? AND c < ? AND d IS NULL AND e IS NOT
NULL", where.sql());
+ Assertions.assertThat(where.parameters()).containsExactly("A", 2, 3);
+ }
+
+ @Test
+ void testGenerateUpdateQueryExtended_supportsNullSetValues() {
+ Map<String, Object> setClause = new LinkedHashMap<>();
+ setClause.put("error_subtype", null);
+ setClause.put("http_status", 200);
+
+ // Use ordered maps so WHERE clause order is deterministic.
+ Map<String, Object> whereEquals = new LinkedHashMap<>();
+ whereEquals.put("realm_id", "r1");
+ whereEquals.put("idempotency_key", "k1");
+ Map<String, Object> whereLess = new LinkedHashMap<>();
+ whereLess.put("http_status", 500);
+
+ QueryGenerator.PreparedQuery q =
+ QueryGenerator.generateUpdateQuery(
+ List.of("error_subtype", "http_status", "realm_id",
"idempotency_key", "executor_id"),
+ "idempotency_records",
+ setClause,
+ whereEquals,
+ Map.of(),
+ whereLess,
+ Set.of("executor_id"),
+ Set.of());
+
+ assertEquals(
+ "UPDATE POLARIS_SCHEMA.idempotency_records SET error_subtype = ?,
http_status = ?"
+ + " WHERE realm_id = ? AND idempotency_key = ? AND http_status < ?
AND executor_id IS NULL",
+ q.sql());
+ Assertions.assertThat(q.parameters()).containsExactly(null, 200, "r1",
"k1", 500);
+ }
+
+ @Test
+ void testGenerateUpdateQueryExtended_rejectsEmptySetClause() {
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ QueryGenerator.generateUpdateQuery(
+ List.of("a"),
+ "t",
+ Map.of(),
+ Map.of("a", 1),
+ Map.of(),
+ Map.of(),
+ Set.of(),
+ Set.of()));
+ }
+
+ @Test
+ void testGenerateDeleteQueryExtended_includesNullPredicatesAndLessThan() {
+ QueryGenerator.PreparedQuery q =
+ QueryGenerator.generateDeleteQuery(
+ List.of("realm_id", "expires_at", "finalized_at"),
+ "idempotency_records",
+ Map.of("realm_id", "r1"),
+ Map.of(),
+ Map.of("expires_at", 123),
+ Set.of("finalized_at"),
+ Set.of());
+
+ assertEquals(
+ "DELETE FROM POLARIS_SCHEMA.idempotency_records WHERE realm_id = ? AND
expires_at < ? AND finalized_at IS NULL",
+ q.sql());
+ Assertions.assertThat(q.parameters()).containsExactly("r1", 123);
+ }
+
+ @Test
+ void testGenerateDeleteQueryExtended_allowsRealmIdEvenIfNotInTableColumns() {
+ QueryGenerator.PreparedQuery q =
+ QueryGenerator.generateDeleteQuery(
+ List.of("id"),
+ "some_table",
+ Map.of("realm_id", "r1"),
+ Map.of(),
+ Map.of(),
+ Set.of(),
+ Set.of());
+
+ assertEquals("DELETE FROM POLARIS_SCHEMA.some_table WHERE realm_id = ?",
q.sql());
+ Assertions.assertThat(q.parameters()).containsExactly("r1");
+ }
+
+ @Test
+ void testGenerateUpdateQueryExtended_rejectsInvalidColumns() {
+ Map<String, Object> setClause = new LinkedHashMap<>();
+ setClause.put("not_a_real_column", 1);
+
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ QueryGenerator.generateUpdateQuery(
+ List.of("a"),
+ "t",
+ setClause,
+ Map.of("a", 1),
+ Map.of(),
+ Map.of(),
+ Set.of(),
+ Set.of()));
+ }
+
@Test
void testGenerateOverlapQuery() {
assertEquals(
diff --git
a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/idempotency/PostgresIdempotencyStoreIT.java
b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStorePostgresIT.java
similarity index 88%
rename from
persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/idempotency/PostgresIdempotencyStoreIT.java
rename to
persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStorePostgresIT.java
index 3d92ceba7..7a19e133e 100644
---
a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/idempotency/PostgresIdempotencyStoreIT.java
+++
b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStorePostgresIT.java
@@ -16,6 +16,7 @@
*/
package org.apache.polaris.persistence.relational.jdbc.idempotency;
+import static
org.apache.polaris.containerspec.ContainerSpecHelper.containerSpecHelper;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.InputStream;
@@ -23,11 +24,12 @@ import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import javax.sql.DataSource;
+import org.apache.polaris.core.entity.IdempotencyRecord;
import org.apache.polaris.core.persistence.IdempotencyStore;
import org.apache.polaris.core.persistence.IdempotencyStore.HeartbeatResult;
-import org.apache.polaris.idempotency.IdempotencyRecord;
import org.apache.polaris.persistence.relational.jdbc.DatasourceOperations;
import
org.apache.polaris.persistence.relational.jdbc.RelationalJdbcConfiguration;
+import
org.apache.polaris.test.commons.PostgresRelationalJdbcLifeCycleManagement;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@@ -37,14 +39,17 @@ import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
@Testcontainers
-public class PostgresIdempotencyStoreIT {
+public class RelationalJdbcIdempotencyStorePostgresIT {
@Container
private static final PostgreSQLContainer<?> POSTGRES =
- new PostgreSQLContainer<>("postgres:17.5-alpine");
+ new PostgreSQLContainer<>(
+ containerSpecHelper("postgres",
PostgresRelationalJdbcLifeCycleManagement.class)
+ .dockerImageName(null)
+ .asCompatibleSubstituteFor("postgres"));
private static DataSource dataSource;
- private static PostgresIdempotencyStore store;
+ private static RelationalJdbcIdempotencyStore store;
@BeforeAll
static void setup() throws Exception {
@@ -84,7 +89,7 @@ public class PostgresIdempotencyStoreIT {
ops.executeScript(is);
}
- store = new PostgresIdempotencyStore(dataSource, cfg);
+ store = new RelationalJdbcIdempotencyStore(dataSource, cfg);
}
@AfterAll
@@ -97,7 +102,7 @@ public class PostgresIdempotencyStoreIT {
String realm = "test-realm";
String key = "K1";
String op = "commit-table";
- String rid = "tables/ns.tbl";
+ String rid = "catalogs/1/tables/ns.tbl";
Instant now = Instant.now();
Instant exp = now.plus(Duration.ofMinutes(5));
@@ -120,7 +125,7 @@ public class PostgresIdempotencyStoreIT {
String realm = "test-realm";
String key = "K2";
String op = "commit-table";
- String rid = "tables/ns.tbl2";
+ String rid = "catalogs/1/tables/ns.tbl2";
Instant now = Instant.now();
Instant exp = now.plus(Duration.ofMinutes(5));
@@ -162,7 +167,7 @@ public class PostgresIdempotencyStoreIT {
String realm = "test-realm";
String key = "K3";
String op = "drop-table";
- String rid = "tables/ns.tbl3";
+ String rid = "catalogs/1/tables/ns.tbl3";
Instant now = Instant.now();
Instant expPast = now.minus(Duration.ofMinutes(1));
@@ -176,9 +181,9 @@ public class PostgresIdempotencyStoreIT {
String realm = "test-realm";
String key = "K4";
String op1 = "commit-table";
- String rid1 = "tables/ns.tbl4";
+ String rid1 = "catalogs/1/tables/ns.tbl4";
String op2 = "drop-table"; // different binding
- String rid2 = "tables/ns.tbl4"; // same resource, different op
+ String rid2 = "catalogs/1/tables/ns.tbl4"; // same resource, different op
Instant now = Instant.now();
Instant exp = now.plus(Duration.ofMinutes(5));
diff --git
a/polaris-core/src/main/java/org/apache/polaris/idempotency/IdempotencyRecord.java
b/polaris-core/src/main/java/org/apache/polaris/core/entity/IdempotencyRecord.java
similarity index 72%
rename from
polaris-core/src/main/java/org/apache/polaris/idempotency/IdempotencyRecord.java
rename to
polaris-core/src/main/java/org/apache/polaris/core/entity/IdempotencyRecord.java
index e4cae7ef8..9def89b15 100644
---
a/polaris-core/src/main/java/org/apache/polaris/idempotency/IdempotencyRecord.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/entity/IdempotencyRecord.java
@@ -14,26 +14,65 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.polaris.idempotency;
+package org.apache.polaris.core.entity;
import java.time.Instant;
+/**
+ * Immutable snapshot of an idempotency reservation and its finalization
status.
+ *
+ * <p>This is the persistence-agnostic representation used by higher layers;
storage backends map
+ * their concrete schemas into this type.
+ */
public final class IdempotencyRecord {
+
+ /** Logical tenant / realm identifier. */
private final String realmId;
+
+ /** Client-provided idempotency key. */
private final String idempotencyKey;
+
+ /** Logical operation type (e.g. {@code "commit-table"}). */
private final String operationType;
+
+ /**
+ * Request-derived, fully-qualified identifier of the affected resource (see
{@link
+ * #getNormalizedResourceId()}).
+ */
private final String normalizedResourceId;
+ /** HTTP status code returned to the client once finalized; {@code null}
while in-progress. */
private final Integer httpStatus;
+
+ /** Optional error subtype/code when the operation failed. */
private final String errorSubtype;
+
+ /** Minimal serialized representation of the response body for replay. */
private final String responseSummary;
+
+ /** Serialized representation of a small, whitelisted set of response
headers for replay. */
private final String responseHeaders;
+
+ /** Timestamp when the operation was finalized; {@code null} while
in-progress. */
private final Instant finalizedAt;
+ /** Timestamp when the record was created. */
private final Instant createdAt;
+
+ /** Timestamp when the record was last updated. */
private final Instant updatedAt;
+
+ /**
+ * Timestamp of the most recent heartbeat while in-progress; {@code null} if
never heartbeated.
+ */
private final Instant heartbeatAt;
+
+ /**
+ * Identifier of the executor that owns the in-progress reservation; {@code
null} if not owned.
+ */
private final String executorId;
+
+ /** Timestamp after which the reservation is considered expired and eligible
for purging. */
private final Instant expiresAt;
public IdempotencyRecord(
@@ -79,6 +118,17 @@ public final class IdempotencyRecord {
return operationType;
}
+ /**
+ * Normalized identifier of the resource affected by the operation.
+ *
+ * <p>This should be derived from the request (for example, a canonicalized
and fully-qualified
+ * identifier like {@code "catalogs/<catalogId>/tables/ns.tbl"}), not from a
generated internal
+ * entity id.
+ *
+ * <p>The identifier must be stable even on failure (before any entities are
created) and must be
+ * scoped to avoid false conflicts (for example, include the
catalog/warehouse identifier when
+ * applicable).
+ */
public String getNormalizedResourceId() {
return normalizedResourceId;
}
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/IdempotencyPersistenceException.java
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/IdempotencyPersistenceException.java
new file mode 100644
index 000000000..b22e2a761
--- /dev/null
+++
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/IdempotencyPersistenceException.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.core.persistence;
+
+import org.apache.polaris.core.exceptions.PolarisException;
+
+/**
+ * Raised when the {@link IdempotencyStore} fails due to an underlying
persistence/storage error.
+ *
+ * <p>This is treated as an internal server error by {@code
PolarisExceptionMapper}.
+ */
+public class IdempotencyPersistenceException extends PolarisException {
+
+ public IdempotencyPersistenceException(String message) {
+ super(message);
+ }
+
+ public IdempotencyPersistenceException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/IdempotencyStore.java
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/IdempotencyStore.java
index cf3e9d98d..f68d7097f 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/IdempotencyStore.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/IdempotencyStore.java
@@ -18,7 +18,7 @@ package org.apache.polaris.core.persistence;
import java.time.Instant;
import java.util.Optional;
-import org.apache.polaris.idempotency.IdempotencyRecord;
+import org.apache.polaris.core.entity.IdempotencyRecord;
/**
* Abstraction for persisting and querying idempotency records.