This is an automated email from the ASF dual-hosted git repository.

yufei pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git


The following commit(s) were added to refs/heads/main by this push:
     new d8d87a5ad JDBC: add DB-agnostic idempotency store + model (#3584)
d8d87a5ad is described below

commit d8d87a5ad6e3e1f216fa8d474c32e5555da88c01
Author: Huaxin Gao <[email protected]>
AuthorDate: Wed Feb 11 16:06:47 2026 -0800

    JDBC: add DB-agnostic idempotency store + model (#3584)
---
 persistence/relational-jdbc/build.gradle.kts       |   3 +
 .../relational/jdbc/QueryGenerator.java            | 117 +++++++++-
 .../jdbc/idempotency/PostgresIdempotencyStore.java | 255 ---------------------
 .../RelationalJdbcIdempotencyStore.java            | 250 ++++++++++++++++++++
 .../jdbc/models/ModelIdempotencyRecord.java        | 199 ++++++++++++++++
 .../relational/jdbc/QueryGeneratorTest.java        | 127 ++++++++++
 ... RelationalJdbcIdempotencyStorePostgresIT.java} |  25 +-
 .../entity}/IdempotencyRecord.java                 |  52 ++++-
 .../IdempotencyPersistenceException.java           |  37 +++
 .../polaris/core/persistence/IdempotencyStore.java |   2 +-
 10 files changed, 794 insertions(+), 273 deletions(-)

diff --git a/persistence/relational-jdbc/build.gradle.kts 
b/persistence/relational-jdbc/build.gradle.kts
index c3e425372..10cb2e8fe 100644
--- a/persistence/relational-jdbc/build.gradle.kts
+++ b/persistence/relational-jdbc/build.gradle.kts
@@ -47,4 +47,7 @@ dependencies {
 
   testImplementation("org.testcontainers:testcontainers-junit-jupiter")
   testImplementation("org.testcontainers:testcontainers-postgresql")
+
+  testImplementation(project(":polaris-container-spec-helper"))
+  testImplementation(project(":polaris-runtime-test-common"))
 }
diff --git 
a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/QueryGenerator.java
 
b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/QueryGenerator.java
index 485956ed8..a8720fda5 100644
--- 
a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/QueryGenerator.java
+++ 
b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/QueryGenerator.java
@@ -192,6 +192,60 @@ public class QueryGenerator {
     return new PreparedQuery(sql, bindingParams);
   }
 
+  /**
+   * Builds an UPDATE query that updates only the specified columns and 
supports richer WHERE
+   * predicates (equality, greater-than, less-than, IS NULL, IS NOT NULL).
+   *
+   * <p>Callers should prefer passing an ordered map (e.g. {@link 
java.util.LinkedHashMap}) for the
+   * set clause so generated SQL and parameter order are consistent.
+   *
+   * @param tableColumns List of valid table columns.
+   * @param tableName Target table.
+   * @param setClause Column-value pairs to update.
+   * @param whereEquals Column-value pairs used in WHERE equality filtering.
+   * @param whereGreater Column-value pairs used in WHERE greater-than 
filtering.
+   * @param whereLess Column-value pairs used in WHERE less-than filtering.
+   * @param whereIsNull Columns that must be NULL.
+   * @param whereIsNotNull Columns that must be NOT NULL.
+   * @return UPDATE query with parameter bindings.
+   */
+  public static PreparedQuery generateUpdateQuery(
+      @Nonnull List<String> tableColumns,
+      @Nonnull String tableName,
+      @Nonnull Map<String, Object> setClause,
+      @Nonnull Map<String, Object> whereEquals,
+      @Nonnull Map<String, Object> whereGreater,
+      @Nonnull Map<String, Object> whereLess,
+      @Nonnull Set<String> whereIsNull,
+      @Nonnull Set<String> whereIsNotNull) {
+    if (setClause.isEmpty()) {
+      throw new IllegalArgumentException("Empty setClause");
+    }
+
+    Set<String> columns = new HashSet<>(tableColumns);
+    validateColumns(columns, setClause.keySet());
+
+    QueryFragment where =
+        generateWhereClauseExtended(
+            columns, whereEquals, whereGreater, whereLess, whereIsNull, 
whereIsNotNull);
+
+    List<String> setParts = new ArrayList<>();
+    List<Object> params = new ArrayList<>();
+    for (Map.Entry<String, Object> entry : setClause.entrySet()) {
+      setParts.add(entry.getKey() + " = ?");
+      params.add(entry.getValue());
+    }
+    params.addAll(where.parameters());
+
+    String sql =
+        "UPDATE "
+            + getFullyQualifiedTableName(tableName)
+            + " SET "
+            + String.join(", ", setParts)
+            + where.sql();
+    return new PreparedQuery(sql, params);
+  }
+
   /**
    * Builds a DELETE query with the given conditions.
    *
@@ -209,6 +263,26 @@ public class QueryGenerator {
         "DELETE FROM " + getFullyQualifiedTableName(tableName) + where.sql(), 
where.parameters());
   }
 
+  /**
+   * Builds a DELETE query that supports richer WHERE predicates (equality, 
greater-than, less-than,
+   * IS NULL, IS NOT NULL).
+   */
+  public static PreparedQuery generateDeleteQuery(
+      @Nonnull List<String> tableColumns,
+      @Nonnull String tableName,
+      @Nonnull Map<String, Object> whereEquals,
+      @Nonnull Map<String, Object> whereGreater,
+      @Nonnull Map<String, Object> whereLess,
+      @Nonnull Set<String> whereIsNull,
+      @Nonnull Set<String> whereIsNotNull) {
+    Set<String> columns = new HashSet<>(tableColumns);
+    QueryFragment where =
+        generateWhereClauseExtended(
+            columns, whereEquals, whereGreater, whereLess, whereIsNull, 
whereIsNotNull);
+    return new PreparedQuery(
+        "DELETE FROM " + getFullyQualifiedTableName(tableName) + where.sql(), 
where.parameters());
+  }
+
   private static PreparedQuery generateSelectQuery(
       @Nonnull List<String> columnNames,
       @Nonnull String tableName,
@@ -231,22 +305,53 @@ public class QueryGenerator {
       @Nonnull Set<String> tableColumns,
       @Nonnull Map<String, Object> whereEquals,
       @Nonnull Map<String, Object> whereGreater) {
+    return generateWhereClauseExtended(
+        tableColumns, whereEquals, whereGreater, Map.of(), Set.of(), Set.of());
+  }
+
+  private static void validateColumns(
+      @Nonnull Set<String> tableColumns, @Nonnull Set<String> columns) {
+    for (String column : columns) {
+      if (!tableColumns.contains(column) && !column.equals("realm_id")) {
+        throw new IllegalArgumentException("Invalid query column: " + column);
+      }
+    }
+  }
+
+  @VisibleForTesting
+  static QueryFragment generateWhereClauseExtended(
+      @Nonnull Set<String> tableColumns,
+      @Nonnull Map<String, Object> whereEquals,
+      @Nonnull Map<String, Object> whereGreater,
+      @Nonnull Map<String, Object> whereLess,
+      @Nonnull Set<String> whereIsNull,
+      @Nonnull Set<String> whereIsNotNull) {
+    validateColumns(tableColumns, whereEquals.keySet());
+    validateColumns(tableColumns, whereGreater.keySet());
+    validateColumns(tableColumns, whereLess.keySet());
+    validateColumns(tableColumns, whereIsNull);
+    validateColumns(tableColumns, whereIsNotNull);
+
     List<String> conditions = new ArrayList<>();
     List<Object> parameters = new ArrayList<>();
     for (Map.Entry<String, Object> entry : whereEquals.entrySet()) {
-      if (!tableColumns.contains(entry.getKey()) && 
!entry.getKey().equals("realm_id")) {
-        throw new IllegalArgumentException("Invalid query column: " + 
entry.getKey());
-      }
       conditions.add(entry.getKey() + " = ?");
       parameters.add(entry.getValue());
     }
     for (Map.Entry<String, Object> entry : whereGreater.entrySet()) {
-      if (!tableColumns.contains(entry.getKey()) && 
!entry.getKey().equals("realm_id")) {
-        throw new IllegalArgumentException("Invalid query column: " + 
entry.getKey());
-      }
       conditions.add(entry.getKey() + " > ?");
       parameters.add(entry.getValue());
     }
+    for (Map.Entry<String, Object> entry : whereLess.entrySet()) {
+      conditions.add(entry.getKey() + " < ?");
+      parameters.add(entry.getValue());
+    }
+    for (String column : whereIsNull) {
+      conditions.add(column + " IS NULL");
+    }
+    for (String column : whereIsNotNull) {
+      conditions.add(column + " IS NOT NULL");
+    }
     String clause = conditions.isEmpty() ? "" : " WHERE " + String.join(" AND 
", conditions);
     return new QueryFragment(clause, parameters);
   }
diff --git 
a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/PostgresIdempotencyStore.java
 
b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/PostgresIdempotencyStore.java
deleted file mode 100644
index ff826e19e..000000000
--- 
a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/PostgresIdempotencyStore.java
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.polaris.persistence.relational.jdbc.idempotency;
-
-import jakarta.annotation.Nonnull;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Timestamp;
-import java.time.Instant;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import javax.sql.DataSource;
-import org.apache.polaris.core.persistence.IdempotencyStore;
-import org.apache.polaris.core.persistence.IdempotencyStore.HeartbeatResult;
-import org.apache.polaris.idempotency.IdempotencyRecord;
-import org.apache.polaris.persistence.relational.jdbc.DatabaseType;
-import org.apache.polaris.persistence.relational.jdbc.DatasourceOperations;
-import org.apache.polaris.persistence.relational.jdbc.QueryGenerator;
-import 
org.apache.polaris.persistence.relational.jdbc.RelationalJdbcConfiguration;
-import org.apache.polaris.persistence.relational.jdbc.models.Converter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/** Postgres implementation of IdempotencyStore. */
-public final class PostgresIdempotencyStore implements IdempotencyStore {
-  private static final Logger LOG = 
LoggerFactory.getLogger(PostgresIdempotencyStore.class);
-
-  private static final String TABLE = "POLARIS_SCHEMA.idempotency_records";
-
-  private final DatasourceOperations ops;
-
-  public PostgresIdempotencyStore(
-      @Nonnull DataSource dataSource, @Nonnull RelationalJdbcConfiguration cfg)
-      throws SQLException {
-    this.ops = new DatasourceOperations(dataSource, cfg);
-  }
-
-  @Override
-  public ReserveResult reserve(
-      String realmId,
-      String idempotencyKey,
-      String operationType,
-      String normalizedResourceId,
-      Instant expiresAt,
-      String executorId,
-      Instant now) {
-    String sql =
-        "INSERT INTO "
-            + TABLE
-            + " (realm_id, idempotency_key, operation_type, resource_id,"
-            + " http_status, error_subtype, response_summary, 
response_headers, finalized_at,"
-            + " created_at, updated_at, heartbeat_at, executor_id, expires_at)"
-            + " VALUES (?, ?, ?, ?, NULL, NULL, NULL, NULL, NULL, ?, ?, ?, ?, 
?)"
-            + " ON CONFLICT (realm_id, idempotency_key) DO NOTHING";
-    List<Object> params =
-        List.of(
-            realmId,
-            idempotencyKey,
-            operationType,
-            normalizedResourceId,
-            Timestamp.from(now),
-            Timestamp.from(now),
-            Timestamp.from(now),
-            executorId,
-            Timestamp.from(expiresAt));
-    try {
-      int updated = ops.executeUpdate(new QueryGenerator.PreparedQuery(sql, 
params));
-      if (updated == 1) {
-        return new ReserveResult(ReserveResultType.OWNED, Optional.empty());
-      } else {
-        // Load existing to return to caller
-        return new ReserveResult(ReserveResultType.DUPLICATE, load(realmId, 
idempotencyKey));
-      }
-    } catch (SQLException e) {
-      throw new RuntimeException("Failed to reserve idempotency key", e);
-    }
-  }
-
-  @Override
-  public Optional<IdempotencyRecord> load(String realmId, String 
idempotencyKey) {
-    String sql =
-        "SELECT realm_id, idempotency_key, operation_type, resource_id, 
http_status, error_subtype,"
-            + " response_summary, response_headers, created_at, updated_at, 
finalized_at, heartbeat_at,"
-            + " executor_id, expires_at"
-            + " FROM "
-            + TABLE
-            + " WHERE realm_id = ? AND idempotency_key = ?";
-    try {
-      final IdempotencyRecord[] holder = new IdempotencyRecord[1];
-      ops.executeSelectOverStream(
-          new QueryGenerator.PreparedQuery(sql, List.of(realmId, 
idempotencyKey)),
-          new Converter<IdempotencyRecord>() {
-            @Override
-            public IdempotencyRecord fromResultSet(ResultSet rs) throws 
SQLException {
-              return convert(rs);
-            }
-
-            @Override
-            public Map<String, Object> toMap(DatabaseType databaseType) {
-              return Map.of();
-            }
-          },
-          stream -> stream.findFirst().ifPresent(r -> holder[0] = r));
-      return Optional.ofNullable(holder[0]);
-    } catch (SQLException e) {
-      throw new RuntimeException("Failed to load idempotency record", e);
-    }
-  }
-
-  @Override
-  public HeartbeatResult updateHeartbeat(
-      String realmId, String idempotencyKey, String executorId, Instant now) {
-    String sql =
-        "UPDATE "
-            + TABLE
-            + " SET heartbeat_at = ?, updated_at = ?"
-            + " WHERE realm_id = ? AND idempotency_key = ?"
-            + " AND http_status IS NULL"
-            + " AND (executor_id IS NULL OR executor_id = ?)";
-    try {
-      int rows =
-          ops.executeUpdate(
-              new QueryGenerator.PreparedQuery(
-                  sql,
-                  List.of(
-                      Timestamp.from(now),
-                      Timestamp.from(now),
-                      realmId,
-                      idempotencyKey,
-                      executorId)));
-      if (rows > 0) {
-        return HeartbeatResult.UPDATED;
-      }
-
-      // No rows updated: determine why by loading the current record, if any.
-      // TODO: consider using a single atomic read/write (for example, 
PostgreSQL
-      // UPDATE ... RETURNING) to avoid this follow-up lookup and make the
-      // conflicting state observable in the same operation.
-      Optional<IdempotencyRecord> existing = load(realmId, idempotencyKey);
-      if (existing.isEmpty()) {
-        return HeartbeatResult.NOT_FOUND;
-      }
-
-      IdempotencyRecord record = existing.get();
-      if (record.getHttpStatus() != null) {
-        return HeartbeatResult.FINALIZED;
-      }
-
-      // Record is still IN_PROGRESS but owned by a different executor.
-      return HeartbeatResult.LOST_OWNERSHIP;
-    } catch (SQLException e) {
-      throw new RuntimeException("Failed to update heartbeat", e);
-    }
-  }
-
-  @Override
-  public boolean finalizeRecord(
-      String realmId,
-      String idempotencyKey,
-      Integer httpStatus,
-      String errorSubtype,
-      String responseSummary,
-      String responseHeaders,
-      Instant finalizedAt) {
-    String sql =
-        "UPDATE "
-            + TABLE
-            + " SET http_status = ?, error_subtype = ?, response_summary = ?, 
response_headers = ?,"
-            + " finalized_at = ?, updated_at = ?"
-            + " WHERE realm_id = ? AND idempotency_key = ? AND http_status IS 
NULL";
-    try {
-      int rows =
-          ops.executeUpdate(
-              new QueryGenerator.PreparedQuery(
-                  sql,
-                  Arrays.asList(
-                      httpStatus,
-                      errorSubtype,
-                      responseSummary,
-                      responseHeaders,
-                      Timestamp.from(finalizedAt),
-                      Timestamp.from(finalizedAt),
-                      realmId,
-                      idempotencyKey)));
-      return rows > 0;
-    } catch (SQLException e) {
-      throw new RuntimeException("Failed to finalize idempotency record", e);
-    }
-  }
-
-  @Override
-  public int purgeExpired(String realmId, Instant before) {
-    String sql = "DELETE FROM " + TABLE + " WHERE realm_id = ? AND expires_at 
< ?";
-    try {
-      return ops.executeUpdate(
-          new QueryGenerator.PreparedQuery(sql, List.of(realmId, 
Timestamp.from(before))));
-    } catch (SQLException e) {
-      throw new RuntimeException("Failed to purge expired idempotency 
records", e);
-    }
-  }
-
-  private static IdempotencyRecord convert(ResultSet rs) {
-    try {
-      String realmId = rs.getString("realm_id");
-      String idempotencyKey = rs.getString("idempotency_key");
-      String operationType = rs.getString("operation_type");
-      String resourceId = rs.getString("resource_id");
-      Integer httpStatus = (Integer) rs.getObject("http_status");
-      String errorSubtype = rs.getString("error_subtype");
-      String responseSummary = rs.getString("response_summary");
-      String responseHeaders = rs.getString("response_headers");
-      Instant createdAt = rs.getTimestamp("created_at").toInstant();
-      Instant updatedAt = rs.getTimestamp("updated_at").toInstant();
-      Timestamp fts = rs.getTimestamp("finalized_at");
-      Instant finalizedAt = fts == null ? null : fts.toInstant();
-      Timestamp hb = rs.getTimestamp("heartbeat_at");
-      Instant heartbeatAt = hb == null ? null : hb.toInstant();
-      String executorId = rs.getString("executor_id");
-      Instant expiresAt = rs.getTimestamp("expires_at").toInstant();
-      return new IdempotencyRecord(
-          realmId,
-          idempotencyKey,
-          operationType,
-          resourceId,
-          httpStatus,
-          errorSubtype,
-          responseSummary,
-          responseHeaders,
-          createdAt,
-          updatedAt,
-          finalizedAt,
-          heartbeatAt,
-          executorId,
-          expiresAt);
-    } catch (SQLException e) {
-      throw new RuntimeException(e);
-    }
-  }
-}
diff --git 
a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStore.java
 
b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStore.java
new file mode 100644
index 000000000..6f3143aef
--- /dev/null
+++ 
b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStore.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.polaris.persistence.relational.jdbc.idempotency;
+
+import jakarta.annotation.Nonnull;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import javax.sql.DataSource;
+import org.apache.polaris.core.entity.IdempotencyRecord;
+import org.apache.polaris.core.persistence.IdempotencyPersistenceException;
+import org.apache.polaris.core.persistence.IdempotencyStore;
+import org.apache.polaris.persistence.relational.jdbc.DatasourceOperations;
+import org.apache.polaris.persistence.relational.jdbc.QueryGenerator;
+import 
org.apache.polaris.persistence.relational.jdbc.RelationalJdbcConfiguration;
+import org.apache.polaris.persistence.relational.jdbc.models.Converter;
+import 
org.apache.polaris.persistence.relational.jdbc.models.ModelIdempotencyRecord;
+
+public class RelationalJdbcIdempotencyStore implements IdempotencyStore {
+
+  private final DatasourceOperations datasourceOperations;
+
+  public RelationalJdbcIdempotencyStore(
+      @Nonnull DataSource dataSource, @Nonnull RelationalJdbcConfiguration cfg)
+      throws SQLException {
+    this.datasourceOperations = new DatasourceOperations(dataSource, cfg);
+  }
+
+  @Override
+  public ReserveResult reserve(
+      String realmId,
+      String idempotencyKey,
+      String operationType,
+      String normalizedResourceId,
+      Instant expiresAt,
+      String executorId,
+      Instant now) {
+    try {
+      // Build insert values directly to avoid requiring an 
Immutables-generated model type.
+      Map<String, Object> insertMap = new LinkedHashMap<>();
+      insertMap.put(ModelIdempotencyRecord.IDEMPOTENCY_KEY, idempotencyKey);
+      insertMap.put(ModelIdempotencyRecord.OPERATION_TYPE, operationType);
+      insertMap.put(ModelIdempotencyRecord.RESOURCE_ID, normalizedResourceId);
+      insertMap.put(ModelIdempotencyRecord.HTTP_STATUS, null);
+      insertMap.put(ModelIdempotencyRecord.ERROR_SUBTYPE, null);
+      insertMap.put(ModelIdempotencyRecord.RESPONSE_SUMMARY, null);
+      insertMap.put(ModelIdempotencyRecord.RESPONSE_HEADERS, null);
+      insertMap.put(ModelIdempotencyRecord.FINALIZED_AT, null);
+      insertMap.put(ModelIdempotencyRecord.CREATED_AT, Timestamp.from(now));
+      insertMap.put(ModelIdempotencyRecord.UPDATED_AT, Timestamp.from(now));
+      insertMap.put(ModelIdempotencyRecord.HEARTBEAT_AT, Timestamp.from(now));
+      insertMap.put(ModelIdempotencyRecord.EXECUTOR_ID, executorId);
+      insertMap.put(ModelIdempotencyRecord.EXPIRES_AT, 
Timestamp.from(expiresAt));
+
+      List<Object> values = insertMap.values().stream().toList();
+      QueryGenerator.PreparedQuery insert =
+          QueryGenerator.generateInsertQuery(
+              ModelIdempotencyRecord.ALL_COLUMNS,
+              ModelIdempotencyRecord.TABLE_NAME,
+              values,
+              realmId);
+      datasourceOperations.executeUpdate(insert);
+      return new ReserveResult(ReserveResultType.OWNED, Optional.empty());
+    } catch (SQLException e) {
+      if (datasourceOperations.isConstraintViolation(e)) {
+        return new ReserveResult(ReserveResultType.DUPLICATE, load(realmId, 
idempotencyKey));
+      }
+      throw new IdempotencyPersistenceException("Failed to reserve idempotency 
key", e);
+    }
+  }
+
+  @Override
+  public Optional<IdempotencyRecord> load(String realmId, String 
idempotencyKey) {
+    try {
+      QueryGenerator.PreparedQuery query =
+          QueryGenerator.generateSelectQuery(
+              ModelIdempotencyRecord.ALL_COLUMNS,
+              ModelIdempotencyRecord.TABLE_NAME,
+              Map.of(
+                  ModelIdempotencyRecord.REALM_ID,
+                  realmId,
+                  ModelIdempotencyRecord.IDEMPOTENCY_KEY,
+                  idempotencyKey));
+      List<IdempotencyRecord> results =
+          datasourceOperations.executeSelect(
+              query,
+              new Converter<>() {
+                @Override
+                public IdempotencyRecord fromResultSet(ResultSet rs) throws 
SQLException {
+                  return ModelIdempotencyRecord.fromRow(realmId, rs);
+                }
+
+                @Override
+                public Map<String, Object> toMap(
+                    
org.apache.polaris.persistence.relational.jdbc.DatabaseType databaseType) {
+                  throw new UnsupportedOperationException("Not used for SELECT 
conversion");
+                }
+              });
+      if (results.isEmpty()) {
+        return Optional.empty();
+      }
+      if (results.size() > 1) {
+        throw new IllegalStateException(
+            "More than one idempotency record found for realm/key: "
+                + realmId
+                + "/"
+                + idempotencyKey);
+      }
+      return Optional.of(results.getFirst());
+    } catch (SQLException e) {
+      throw new IdempotencyPersistenceException("Failed to load idempotency 
record", e);
+    }
+  }
+
+  @Override
+  public HeartbeatResult updateHeartbeat(
+      String realmId, String idempotencyKey, String executorId, Instant now) {
+    Optional<IdempotencyRecord> existing = load(realmId, idempotencyKey);
+    if (existing.isEmpty()) {
+      return HeartbeatResult.NOT_FOUND;
+    }
+
+    IdempotencyRecord record = existing.get();
+    if (record.getHttpStatus() != null) {
+      return HeartbeatResult.FINALIZED;
+    }
+    if (record.getExecutorId() == null || 
!record.getExecutorId().equals(executorId)) {
+      return HeartbeatResult.LOST_OWNERSHIP;
+    }
+
+    QueryGenerator.PreparedQuery update =
+        QueryGenerator.generateUpdateQuery(
+            ModelIdempotencyRecord.ALL_COLUMNS,
+            ModelIdempotencyRecord.TABLE_NAME,
+            Map.of(
+                ModelIdempotencyRecord.HEARTBEAT_AT,
+                Timestamp.from(now),
+                ModelIdempotencyRecord.UPDATED_AT,
+                Timestamp.from(now)),
+            Map.of(
+                ModelIdempotencyRecord.REALM_ID,
+                realmId,
+                ModelIdempotencyRecord.IDEMPOTENCY_KEY,
+                idempotencyKey,
+                ModelIdempotencyRecord.EXECUTOR_ID,
+                executorId),
+            Map.of(),
+            Map.of(),
+            Set.of(ModelIdempotencyRecord.HTTP_STATUS),
+            Set.of());
+
+    try {
+      int updated = datasourceOperations.executeUpdate(update);
+      if (updated > 0) {
+        return HeartbeatResult.UPDATED;
+      }
+    } catch (SQLException e) {
+      throw new IdempotencyPersistenceException("Failed to update idempotency 
heartbeat", e);
+    }
+
+    // Raced with finalize/ownership loss; re-check to return a meaningful 
result.
+    Optional<IdempotencyRecord> after = load(realmId, idempotencyKey);
+    if (after.isEmpty()) {
+      return HeartbeatResult.NOT_FOUND;
+    }
+    if (after.get().getHttpStatus() != null) {
+      return HeartbeatResult.FINALIZED;
+    }
+    return HeartbeatResult.LOST_OWNERSHIP;
+  }
+
+  @Override
+  public boolean finalizeRecord(
+      String realmId,
+      String idempotencyKey,
+      Integer httpStatus,
+      String errorSubtype,
+      String responseSummary,
+      String responseHeaders,
+      Instant finalizedAt) {
+    // Use ordered/set maps so we can include nullable values (Map.of 
disallows nulls).
+    Map<String, Object> setClause = new LinkedHashMap<>();
+    setClause.put(ModelIdempotencyRecord.HTTP_STATUS, httpStatus);
+    setClause.put(ModelIdempotencyRecord.ERROR_SUBTYPE, errorSubtype);
+    setClause.put(ModelIdempotencyRecord.RESPONSE_SUMMARY, responseSummary);
+    setClause.put(ModelIdempotencyRecord.RESPONSE_HEADERS, responseHeaders);
+    setClause.put(ModelIdempotencyRecord.FINALIZED_AT, 
Timestamp.from(finalizedAt));
+    setClause.put(ModelIdempotencyRecord.UPDATED_AT, 
Timestamp.from(finalizedAt));
+
+    Map<String, Object> whereEquals = new HashMap<>();
+    whereEquals.put(ModelIdempotencyRecord.REALM_ID, realmId);
+    whereEquals.put(ModelIdempotencyRecord.IDEMPOTENCY_KEY, idempotencyKey);
+
+    QueryGenerator.PreparedQuery update =
+        QueryGenerator.generateUpdateQuery(
+            ModelIdempotencyRecord.ALL_COLUMNS,
+            ModelIdempotencyRecord.TABLE_NAME,
+            setClause,
+            whereEquals,
+            Map.of(),
+            Map.of(),
+            Set.of(ModelIdempotencyRecord.HTTP_STATUS),
+            Set.of());
+
+    try {
+      return datasourceOperations.executeUpdate(update) > 0;
+    } catch (SQLException e) {
+      throw new IdempotencyPersistenceException("Failed to finalize 
idempotency record", e);
+    }
+  }
+
+  @Override
+  public int purgeExpired(String realmId, Instant before) {
+    try {
+      QueryGenerator.PreparedQuery delete =
+          QueryGenerator.generateDeleteQuery(
+              ModelIdempotencyRecord.ALL_COLUMNS,
+              ModelIdempotencyRecord.TABLE_NAME,
+              Map.of(ModelIdempotencyRecord.REALM_ID, realmId),
+              Map.of(),
+              Map.of(ModelIdempotencyRecord.EXPIRES_AT, 
Timestamp.from(before)),
+              Set.of(),
+              Set.of(ModelIdempotencyRecord.EXPIRES_AT));
+      return datasourceOperations.executeUpdate(delete);
+    } catch (SQLException e) {
+      throw new IdempotencyPersistenceException("Failed to purge expired 
idempotency records", e);
+    }
+  }
+}
diff --git 
a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelIdempotencyRecord.java
 
b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelIdempotencyRecord.java
new file mode 100644
index 000000000..74473cc24
--- /dev/null
+++ 
b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelIdempotencyRecord.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.persistence.relational.jdbc.models;
+
+import jakarta.annotation.Nullable;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.polaris.core.entity.IdempotencyRecord;
+import org.apache.polaris.persistence.relational.jdbc.DatabaseType;
+
+/**
+ * JDBC model for {@link IdempotencyRecord} mirroring the {@code 
idempotency_records} table.
+ *
+ * <p>This follows the same pattern as {@link ModelEvent}, separating the 
storage representation
+ * from the core domain model while still providing {@link Converter} helpers.
+ *
+ * <p>Note: {@code realm_id} is treated as an implicit column across 
relational-jdbc: callers can
+ * filter on it in WHERE clauses even if it is not included in the projection 
list.
+ */
+public interface ModelIdempotencyRecord extends Converter<IdempotencyRecord> {
+
+  String TABLE_NAME = "idempotency_records";
+
+  // Logical tenant / realm identifier.
+  String REALM_ID = "realm_id";
+  // Client-provided idempotency key.
+  String IDEMPOTENCY_KEY = "idempotency_key";
+  // Logical operation type (e.g. commit-table).
+  String OPERATION_TYPE = "operation_type";
+  // Normalized identifier of the affected resource.
+  String RESOURCE_ID = "resource_id";
+
+  // Final HTTP status code once the operation is completed (null while 
in-progress).
+  String HTTP_STATUS = "http_status";
+  // Optional error subtype for failures.
+  String ERROR_SUBTYPE = "error_subtype";
+  // Short serialized representation of the response body.
+  String RESPONSE_SUMMARY = "response_summary";
+  // Serialized representation of response headers.
+  String RESPONSE_HEADERS = "response_headers";
+  // Timestamp when the operation was finalized (null while in-progress).
+  String FINALIZED_AT = "finalized_at";
+
+  // Timestamp when the record was created.
+  String CREATED_AT = "created_at";
+  // Timestamp when the record was last updated.
+  String UPDATED_AT = "updated_at";
+  // Timestamp for the last heartbeat update (null if no heartbeat recorded).
+  String HEARTBEAT_AT = "heartbeat_at";
+  // Identifier of the executor that owns the in-progress record (null if not 
owned).
+  String EXECUTOR_ID = "executor_id";
+  // Expiration timestamp after which the record can be considered 
stale/purgeable.
+  String EXPIRES_AT = "expires_at";
+
+  List<String> ALL_COLUMNS =
+      List.of(
+          IDEMPOTENCY_KEY,
+          OPERATION_TYPE,
+          RESOURCE_ID,
+          HTTP_STATUS,
+          ERROR_SUBTYPE,
+          RESPONSE_SUMMARY,
+          RESPONSE_HEADERS,
+          FINALIZED_AT,
+          CREATED_AT,
+          UPDATED_AT,
+          HEARTBEAT_AT,
+          EXECUTOR_ID,
+          EXPIRES_AT);
+
+  String getRealmId();
+
+  String getIdempotencyKey();
+
+  String getOperationType();
+
+  String getResourceId();
+
+  @Nullable
+  Integer getHttpStatus();
+
+  @Nullable
+  String getErrorSubtype();
+
+  @Nullable
+  String getResponseSummary();
+
+  @Nullable
+  String getResponseHeaders();
+
+  @Nullable
+  Instant getFinalizedAt();
+
+  Instant getCreatedAt();
+
+  Instant getUpdatedAt();
+
+  @Nullable
+  Instant getHeartbeatAt();
+
+  @Nullable
+  String getExecutorId();
+
+  Instant getExpiresAt();
+
+  @Override
+  default IdempotencyRecord fromResultSet(ResultSet rs) throws SQLException {
+    return fromRow(rs);
+  }
+
+  /** Convert the current ResultSet row into an {@link IdempotencyRecord}. */
+  static IdempotencyRecord fromRow(ResultSet rs) throws SQLException {
+    // Requires realm_id to be projected in the ResultSet.
+    return fromRow(rs.getString(REALM_ID), rs);
+  }
+
+  /**
+   * Convert the current ResultSet row into an {@link IdempotencyRecord}, 
using {@code realmId} from
+   * call context (so callers can project only {@link #ALL_COLUMNS}).
+   */
+  static IdempotencyRecord fromRow(String realmId, ResultSet rs) throws 
SQLException {
+    String idempotencyKey = rs.getString(IDEMPOTENCY_KEY);
+    String operationType = rs.getString(OPERATION_TYPE);
+    String resourceId = rs.getString(RESOURCE_ID);
+
+    Integer httpStatus = (Integer) rs.getObject(HTTP_STATUS);
+    String errorSubtype = rs.getString(ERROR_SUBTYPE);
+    String responseSummary = rs.getString(RESPONSE_SUMMARY);
+    String responseHeaders = rs.getString(RESPONSE_HEADERS);
+
+    Instant createdAt = rs.getTimestamp(CREATED_AT).toInstant();
+    Instant updatedAt = rs.getTimestamp(UPDATED_AT).toInstant();
+
+    Timestamp finalizedTs = rs.getTimestamp(FINALIZED_AT);
+    Instant finalizedAt = finalizedTs == null ? null : finalizedTs.toInstant();
+
+    Timestamp heartbeatTs = rs.getTimestamp(HEARTBEAT_AT);
+    Instant heartbeatAt = heartbeatTs == null ? null : heartbeatTs.toInstant();
+
+    String executorId = rs.getString(EXECUTOR_ID);
+    Instant expiresAt = rs.getTimestamp(EXPIRES_AT).toInstant();
+
+    return new IdempotencyRecord(
+        realmId,
+        idempotencyKey,
+        operationType,
+        resourceId,
+        httpStatus,
+        errorSubtype,
+        responseSummary,
+        responseHeaders,
+        createdAt,
+        updatedAt,
+        finalizedAt,
+        heartbeatAt,
+        executorId,
+        expiresAt);
+  }
+
+  @Override
+  default Map<String, Object> toMap(DatabaseType databaseType) {
+    Map<String, Object> map = new LinkedHashMap<>();
+    map.put(IDEMPOTENCY_KEY, getIdempotencyKey());
+    map.put(OPERATION_TYPE, getOperationType());
+    map.put(RESOURCE_ID, getResourceId());
+    map.put(HTTP_STATUS, getHttpStatus());
+    map.put(ERROR_SUBTYPE, getErrorSubtype());
+    map.put(RESPONSE_SUMMARY, getResponseSummary());
+    map.put(RESPONSE_HEADERS, getResponseHeaders());
+    map.put(FINALIZED_AT, getFinalizedAt() == null ? null : 
Timestamp.from(getFinalizedAt()));
+    map.put(CREATED_AT, Timestamp.from(getCreatedAt()));
+    map.put(UPDATED_AT, Timestamp.from(getUpdatedAt()));
+    map.put(HEARTBEAT_AT, getHeartbeatAt() == null ? null : 
Timestamp.from(getHeartbeatAt()));
+    map.put(EXECUTOR_ID, getExecutorId());
+    map.put(EXPIRES_AT, Timestamp.from(getExpiresAt()));
+    return map;
+  }
+}
diff --git 
a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/QueryGeneratorTest.java
 
b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/QueryGeneratorTest.java
index d31b07788..38a06f2b5 100644
--- 
a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/QueryGeneratorTest.java
+++ 
b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/QueryGeneratorTest.java
@@ -26,6 +26,8 @@ import static org.mockito.Mockito.when;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -220,6 +222,131 @@ public class QueryGeneratorTest {
     assertEquals("", QueryGenerator.generateWhereClause(Set.of(), whereClause, 
Map.of()).sql());
   }
 
+  @Test
+  void testGenerateWhereClauseExtended_allPredicatesAndStableParameterOrder() {
+    Map<String, Object> whereEquals = new LinkedHashMap<>();
+    whereEquals.put("a", "A");
+    Map<String, Object> whereGreater = new LinkedHashMap<>();
+    whereGreater.put("b", 2);
+    Map<String, Object> whereLess = new LinkedHashMap<>();
+    whereLess.put("c", 3);
+
+    Set<String> whereIsNull = new LinkedHashSet<>(List.of("d"));
+    Set<String> whereIsNotNull = new LinkedHashSet<>(List.of("e"));
+
+    QueryGenerator.QueryFragment where =
+        QueryGenerator.generateWhereClauseExtended(
+            Set.of("a", "b", "c", "d", "e"),
+            whereEquals,
+            whereGreater,
+            whereLess,
+            whereIsNull,
+            whereIsNotNull);
+
+    assertEquals(" WHERE a = ? AND b > ? AND c < ? AND d IS NULL AND e IS NOT 
NULL", where.sql());
+    Assertions.assertThat(where.parameters()).containsExactly("A", 2, 3);
+  }
+
+  @Test
+  void testGenerateUpdateQueryExtended_supportsNullSetValues() {
+    Map<String, Object> setClause = new LinkedHashMap<>();
+    setClause.put("error_subtype", null);
+    setClause.put("http_status", 200);
+
+    // Use ordered maps so WHERE clause order is deterministic.
+    Map<String, Object> whereEquals = new LinkedHashMap<>();
+    whereEquals.put("realm_id", "r1");
+    whereEquals.put("idempotency_key", "k1");
+    Map<String, Object> whereLess = new LinkedHashMap<>();
+    whereLess.put("http_status", 500);
+
+    QueryGenerator.PreparedQuery q =
+        QueryGenerator.generateUpdateQuery(
+            List.of("error_subtype", "http_status", "realm_id", 
"idempotency_key", "executor_id"),
+            "idempotency_records",
+            setClause,
+            whereEquals,
+            Map.of(),
+            whereLess,
+            Set.of("executor_id"),
+            Set.of());
+
+    assertEquals(
+        "UPDATE POLARIS_SCHEMA.idempotency_records SET error_subtype = ?, 
http_status = ?"
+            + " WHERE realm_id = ? AND idempotency_key = ? AND http_status < ? 
AND executor_id IS NULL",
+        q.sql());
+    Assertions.assertThat(q.parameters()).containsExactly(null, 200, "r1", 
"k1", 500);
+  }
+
+  @Test
+  void testGenerateUpdateQueryExtended_rejectsEmptySetClause() {
+    assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            QueryGenerator.generateUpdateQuery(
+                List.of("a"),
+                "t",
+                Map.of(),
+                Map.of("a", 1),
+                Map.of(),
+                Map.of(),
+                Set.of(),
+                Set.of()));
+  }
+
+  @Test
+  void testGenerateDeleteQueryExtended_includesNullPredicatesAndLessThan() {
+    QueryGenerator.PreparedQuery q =
+        QueryGenerator.generateDeleteQuery(
+            List.of("realm_id", "expires_at", "finalized_at"),
+            "idempotency_records",
+            Map.of("realm_id", "r1"),
+            Map.of(),
+            Map.of("expires_at", 123),
+            Set.of("finalized_at"),
+            Set.of());
+
+    assertEquals(
+        "DELETE FROM POLARIS_SCHEMA.idempotency_records WHERE realm_id = ? AND 
expires_at < ? AND finalized_at IS NULL",
+        q.sql());
+    Assertions.assertThat(q.parameters()).containsExactly("r1", 123);
+  }
+
+  @Test
+  void testGenerateDeleteQueryExtended_allowsRealmIdEvenIfNotInTableColumns() {
+    QueryGenerator.PreparedQuery q =
+        QueryGenerator.generateDeleteQuery(
+            List.of("id"),
+            "some_table",
+            Map.of("realm_id", "r1"),
+            Map.of(),
+            Map.of(),
+            Set.of(),
+            Set.of());
+
+    assertEquals("DELETE FROM POLARIS_SCHEMA.some_table WHERE realm_id = ?", 
q.sql());
+    Assertions.assertThat(q.parameters()).containsExactly("r1");
+  }
+
+  @Test
+  void testGenerateUpdateQueryExtended_rejectsInvalidColumns() {
+    Map<String, Object> setClause = new LinkedHashMap<>();
+    setClause.put("not_a_real_column", 1);
+
+    assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            QueryGenerator.generateUpdateQuery(
+                List.of("a"),
+                "t",
+                setClause,
+                Map.of("a", 1),
+                Map.of(),
+                Map.of(),
+                Set.of(),
+                Set.of()));
+  }
+
   @Test
   void testGenerateOverlapQuery() {
     assertEquals(
diff --git 
a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/idempotency/PostgresIdempotencyStoreIT.java
 
b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStorePostgresIT.java
similarity index 88%
rename from 
persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/idempotency/PostgresIdempotencyStoreIT.java
rename to 
persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStorePostgresIT.java
index 3d92ceba7..7a19e133e 100644
--- 
a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/idempotency/PostgresIdempotencyStoreIT.java
+++ 
b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStorePostgresIT.java
@@ -16,6 +16,7 @@
  */
 package org.apache.polaris.persistence.relational.jdbc.idempotency;
 
+import static 
org.apache.polaris.containerspec.ContainerSpecHelper.containerSpecHelper;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import java.io.InputStream;
@@ -23,11 +24,12 @@ import java.time.Duration;
 import java.time.Instant;
 import java.util.Optional;
 import javax.sql.DataSource;
+import org.apache.polaris.core.entity.IdempotencyRecord;
 import org.apache.polaris.core.persistence.IdempotencyStore;
 import org.apache.polaris.core.persistence.IdempotencyStore.HeartbeatResult;
-import org.apache.polaris.idempotency.IdempotencyRecord;
 import org.apache.polaris.persistence.relational.jdbc.DatasourceOperations;
 import 
org.apache.polaris.persistence.relational.jdbc.RelationalJdbcConfiguration;
+import 
org.apache.polaris.test.commons.PostgresRelationalJdbcLifeCycleManagement;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
@@ -37,14 +39,17 @@ import org.testcontainers.junit.jupiter.Container;
 import org.testcontainers.junit.jupiter.Testcontainers;
 
 @Testcontainers
-public class PostgresIdempotencyStoreIT {
+public class RelationalJdbcIdempotencyStorePostgresIT {
 
   @Container
   private static final PostgreSQLContainer<?> POSTGRES =
-      new PostgreSQLContainer<>("postgres:17.5-alpine");
+      new PostgreSQLContainer<>(
+          containerSpecHelper("postgres", 
PostgresRelationalJdbcLifeCycleManagement.class)
+              .dockerImageName(null)
+              .asCompatibleSubstituteFor("postgres"));
 
   private static DataSource dataSource;
-  private static PostgresIdempotencyStore store;
+  private static RelationalJdbcIdempotencyStore store;
 
   @BeforeAll
   static void setup() throws Exception {
@@ -84,7 +89,7 @@ public class PostgresIdempotencyStoreIT {
       ops.executeScript(is);
     }
 
-    store = new PostgresIdempotencyStore(dataSource, cfg);
+    store = new RelationalJdbcIdempotencyStore(dataSource, cfg);
   }
 
   @AfterAll
@@ -97,7 +102,7 @@ public class PostgresIdempotencyStoreIT {
     String realm = "test-realm";
     String key = "K1";
     String op = "commit-table";
-    String rid = "tables/ns.tbl";
+    String rid = "catalogs/1/tables/ns.tbl";
     Instant now = Instant.now();
     Instant exp = now.plus(Duration.ofMinutes(5));
 
@@ -120,7 +125,7 @@ public class PostgresIdempotencyStoreIT {
     String realm = "test-realm";
     String key = "K2";
     String op = "commit-table";
-    String rid = "tables/ns.tbl2";
+    String rid = "catalogs/1/tables/ns.tbl2";
     Instant now = Instant.now();
     Instant exp = now.plus(Duration.ofMinutes(5));
 
@@ -162,7 +167,7 @@ public class PostgresIdempotencyStoreIT {
     String realm = "test-realm";
     String key = "K3";
     String op = "drop-table";
-    String rid = "tables/ns.tbl3";
+    String rid = "catalogs/1/tables/ns.tbl3";
     Instant now = Instant.now();
     Instant expPast = now.minus(Duration.ofMinutes(1));
 
@@ -176,9 +181,9 @@ public class PostgresIdempotencyStoreIT {
     String realm = "test-realm";
     String key = "K4";
     String op1 = "commit-table";
-    String rid1 = "tables/ns.tbl4";
+    String rid1 = "catalogs/1/tables/ns.tbl4";
     String op2 = "drop-table"; // different binding
-    String rid2 = "tables/ns.tbl4"; // same resource, different op
+    String rid2 = "catalogs/1/tables/ns.tbl4"; // same resource, different op
     Instant now = Instant.now();
     Instant exp = now.plus(Duration.ofMinutes(5));
 
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/idempotency/IdempotencyRecord.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/entity/IdempotencyRecord.java
similarity index 72%
rename from 
polaris-core/src/main/java/org/apache/polaris/idempotency/IdempotencyRecord.java
rename to 
polaris-core/src/main/java/org/apache/polaris/core/entity/IdempotencyRecord.java
index e4cae7ef8..9def89b15 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/idempotency/IdempotencyRecord.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/entity/IdempotencyRecord.java
@@ -14,26 +14,65 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.polaris.idempotency;
+package org.apache.polaris.core.entity;
 
 import java.time.Instant;
 
+/**
+ * Immutable snapshot of an idempotency reservation and its finalization 
status.
+ *
+ * <p>This is the persistence-agnostic representation used by higher layers; 
storage backends map
+ * their concrete schemas into this type.
+ */
 public final class IdempotencyRecord {
+
+  /** Logical tenant / realm identifier. */
   private final String realmId;
+
+  /** Client-provided idempotency key. */
   private final String idempotencyKey;
+
+  /** Logical operation type (e.g. {@code "commit-table"}). */
   private final String operationType;
+
+  /**
+   * Request-derived, fully-qualified identifier of the affected resource (see 
{@link
+   * #getNormalizedResourceId()}).
+   */
   private final String normalizedResourceId;
 
+  /** HTTP status code returned to the client once finalized; {@code null} 
while in-progress. */
   private final Integer httpStatus;
+
+  /** Optional error subtype/code when the operation failed. */
   private final String errorSubtype;
+
+  /** Minimal serialized representation of the response body for replay. */
   private final String responseSummary;
+
+  /** Serialized representation of a small, whitelisted set of response 
headers for replay. */
   private final String responseHeaders;
+
+  /** Timestamp when the operation was finalized; {@code null} while 
in-progress. */
   private final Instant finalizedAt;
 
+  /** Timestamp when the record was created. */
   private final Instant createdAt;
+
+  /** Timestamp when the record was last updated. */
   private final Instant updatedAt;
+
+  /**
+   * Timestamp of the most recent heartbeat while in-progress; {@code null} if 
never heartbeated.
+   */
   private final Instant heartbeatAt;
+
+  /**
+   * Identifier of the executor that owns the in-progress reservation; {@code 
null} if not owned.
+   */
   private final String executorId;
+
+  /** Timestamp after which the reservation is considered expired and eligible 
for purging. */
   private final Instant expiresAt;
 
   public IdempotencyRecord(
@@ -79,6 +118,17 @@ public final class IdempotencyRecord {
     return operationType;
   }
 
+  /**
+   * Normalized identifier of the resource affected by the operation.
+   *
+   * <p>This should be derived from the request (for example, a canonicalized 
and fully-qualified
+   * identifier like {@code "catalogs/<catalogId>/tables/ns.tbl"}), not from a 
generated internal
+   * entity id.
+   *
+   * <p>The identifier must be stable even on failure (before any entities are 
created) and must be
+   * scoped to avoid false conflicts (for example, include the 
catalog/warehouse identifier when
+   * applicable).
+   */
   public String getNormalizedResourceId() {
     return normalizedResourceId;
   }
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/IdempotencyPersistenceException.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/IdempotencyPersistenceException.java
new file mode 100644
index 000000000..b22e2a761
--- /dev/null
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/IdempotencyPersistenceException.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.core.persistence;
+
+import org.apache.polaris.core.exceptions.PolarisException;
+
+/**
+ * Raised when the {@link IdempotencyStore} fails due to an underlying 
persistence/storage error.
+ *
+ * <p>This is treated as an internal server error by {@code 
PolarisExceptionMapper}.
+ */
+public class IdempotencyPersistenceException extends PolarisException {
+
+  public IdempotencyPersistenceException(String message) {
+    super(message);
+  }
+
+  public IdempotencyPersistenceException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/IdempotencyStore.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/IdempotencyStore.java
index cf3e9d98d..f68d7097f 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/IdempotencyStore.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/IdempotencyStore.java
@@ -18,7 +18,7 @@ package org.apache.polaris.core.persistence;
 
 import java.time.Instant;
 import java.util.Optional;
-import org.apache.polaris.idempotency.IdempotencyRecord;
+import org.apache.polaris.core.entity.IdempotencyRecord;
 
 /**
  * Abstraction for persisting and querying idempotency records.

Reply via email to