This is an automated email from the ASF dual-hosted git repository.

singhpk234 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git


The following commit(s) were added to refs/heads/main by this push:
     new f393a1dd4 JDBC: Use PreparedStatement (#1802)
f393a1dd4 is described below

commit f393a1dd4d1ff6416e37255f60ada9ac51ece13b
Author: Prashant Singh <35593236+singhpk...@users.noreply.github.com>
AuthorDate: Fri Jun 6 11:26:48 2025 -0700

    JDBC: Use PreparedStatement (#1802)
---
 persistence/relational-jdbc/build.gradle.kts       |   1 +
 .../relational/jdbc/DatasourceOperations.java      |  60 ++--
 .../relational/jdbc/JdbcBasePersistenceImpl.java   | 282 ++++++++++++++-----
 .../jdbc/JdbcMetaStoreManagerFactory.java          |  17 +-
 .../relational/jdbc/QueryGenerator.java            | 310 ++++++++++-----------
 .../relational/jdbc/models/Converter.java          |  15 +-
 .../relational/jdbc/models/ModelEntity.java        |  39 ++-
 .../relational/jdbc/models/ModelGrantRecord.java   |  18 +-
 .../jdbc/models/ModelPolicyMappingRecord.java      |  25 +-
 .../models/ModelPrincipalAuthenticationData.java   |  18 +-
 ...toreManagerWithJdbcBasePersistenceImplTest.java |   2 +-
 .../relational/jdbc/DatasourceOperationsTest.java  |  41 +--
 .../relational/jdbc/QueryGeneratorTest.java        | 166 +++++------
 13 files changed, 620 insertions(+), 374 deletions(-)

diff --git a/persistence/relational-jdbc/build.gradle.kts 
b/persistence/relational-jdbc/build.gradle.kts
index b38baf9df..2750d4e19 100644
--- a/persistence/relational-jdbc/build.gradle.kts
+++ b/persistence/relational-jdbc/build.gradle.kts
@@ -34,6 +34,7 @@ dependencies {
   compileOnly(libs.jakarta.inject.api)
 
   implementation(libs.smallrye.common.annotation) // @Identifier
+  implementation(libs.postgresql)
 
   testImplementation(libs.mockito.junit.jupiter)
   testImplementation(libs.h2)
diff --git 
a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/DatasourceOperations.java
 
b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/DatasourceOperations.java
index 522600f38..84168727c 100644
--- 
a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/DatasourceOperations.java
+++ 
b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/DatasourceOperations.java
@@ -26,6 +26,7 @@ import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.sql.Connection;
+import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
@@ -54,15 +55,23 @@ public class DatasourceOperations {
 
   private final DataSource datasource;
   private final RelationalJdbcConfiguration relationalJdbcConfiguration;
+  private final DatabaseType databaseType;
 
   private final Random random = new Random();
 
   public DatasourceOperations(
-      DataSource datasource, RelationalJdbcConfiguration 
relationalJdbcConfiguration) {
+      DataSource datasource,
+      DatabaseType databaseType,
+      RelationalJdbcConfiguration relationalJdbcConfiguration) {
     this.datasource = datasource;
+    this.databaseType = databaseType;
     this.relationalJdbcConfiguration = relationalJdbcConfiguration;
   }
 
+  public DatabaseType getDatabaseType() {
+    return databaseType;
+  }
+
   /**
    * Execute SQL script
    *
@@ -116,7 +125,8 @@ public class DatasourceOperations {
    * @param <T> : Business entity class
    * @throws SQLException : Exception during the query execution.
    */
-  public <T> List<T> executeSelect(@Nonnull String query, @Nonnull 
Converter<T> converterInstance)
+  public <T> List<T> executeSelect(
+      @Nonnull QueryGenerator.PreparedQuery query, @Nonnull Converter<T> 
converterInstance)
       throws SQLException {
     ArrayList<T> results = new ArrayList<>();
     executeSelectOverStream(query, converterInstance, stream -> 
stream.forEach(results::add));
@@ -134,18 +144,23 @@ public class DatasourceOperations {
    * @throws SQLException : Exception during the query execution.
    */
   public <T> void executeSelectOverStream(
-      @Nonnull String query,
+      @Nonnull QueryGenerator.PreparedQuery query,
       @Nonnull Converter<T> converterInstance,
       @Nonnull Consumer<Stream<T>> consumer)
       throws SQLException {
     withRetries(
         () -> {
           try (Connection connection = borrowConnection();
-              Statement statement = connection.createStatement();
-              ResultSet resultSet = statement.executeQuery(query)) {
-            ResultSetIterator<T> iterator = new ResultSetIterator<>(resultSet, 
converterInstance);
-            consumer.accept(iterator.toStream());
-            return null;
+              PreparedStatement statement = 
connection.prepareStatement(query.sql())) {
+            List<Object> params = query.parameters();
+            for (int i = 0; i < params.size(); i++) {
+              statement.setObject(i + 1, params.get(i));
+            }
+            try (ResultSet resultSet = statement.executeQuery()) {
+              ResultSetIterator<T> iterator = new 
ResultSetIterator<>(resultSet, converterInstance);
+              consumer.accept(iterator.toStream());
+              return null;
+            }
           }
         });
   }
@@ -153,19 +168,23 @@ public class DatasourceOperations {
   /**
    * Executes the UPDATE or INSERT Query
    *
-   * @param query : query to be executed
+   * @param preparedQuery : query to be executed
    * @return : Number of rows modified / inserted.
    * @throws SQLException : Exception during Query Execution.
    */
-  public int executeUpdate(String query) throws SQLException {
+  public int executeUpdate(QueryGenerator.PreparedQuery preparedQuery) throws 
SQLException {
     return withRetries(
         () -> {
           try (Connection connection = borrowConnection();
-              Statement statement = connection.createStatement()) {
+              PreparedStatement statement = 
connection.prepareStatement(preparedQuery.sql())) {
+            List<Object> params = preparedQuery.parameters();
+            for (int i = 0; i < params.size(); i++) {
+              statement.setObject(i + 1, params.get(i));
+            }
             boolean autoCommit = connection.getAutoCommit();
             connection.setAutoCommit(true);
             try {
-              return statement.executeUpdate(query);
+              return statement.executeUpdate();
             } finally {
               connection.setAutoCommit(autoCommit);
             }
@@ -188,9 +207,7 @@ public class DatasourceOperations {
             connection.setAutoCommit(false);
             try {
               try {
-                try (Statement statement = connection.createStatement()) {
-                  success = callback.execute(statement);
-                }
+                success = callback.execute(connection);
               } finally {
                 if (success) {
                   connection.commit();
@@ -206,6 +223,17 @@ public class DatasourceOperations {
         });
   }
 
+  public Integer execute(Connection connection, QueryGenerator.PreparedQuery 
preparedQuery)
+      throws SQLException {
+    try (PreparedStatement statement = 
connection.prepareStatement(preparedQuery.sql())) {
+      List<Object> params = preparedQuery.parameters();
+      for (int i = 0; i < params.size(); i++) {
+        statement.setObject(i + 1, params.get(i));
+      }
+      return statement.executeUpdate();
+    }
+  }
+
   private boolean isRetryable(SQLException e) {
     String sqlState = e.getSQLState();
 
@@ -291,7 +319,7 @@ public class DatasourceOperations {
 
   // Interface for transaction callback
   public interface TransactionCallback {
-    boolean execute(Statement statement) throws SQLException;
+    boolean execute(Connection connection) throws SQLException;
   }
 
   public boolean isConstraintViolation(SQLException e) {
diff --git 
a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java
 
b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java
index 3c5dbd502..2fb0c90ca 100644
--- 
a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java
+++ 
b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java
@@ -18,16 +18,17 @@
  */
 package org.apache.polaris.persistence.relational.jdbc;
 
-import static org.apache.polaris.persistence.relational.jdbc.QueryGenerator.*;
+import static 
org.apache.polaris.persistence.relational.jdbc.QueryGenerator.PreparedQuery;
 
 import com.google.common.base.Preconditions;
 import jakarta.annotation.Nonnull;
 import jakarta.annotation.Nullable;
+import java.sql.Connection;
 import java.sql.SQLException;
-import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
@@ -53,6 +54,7 @@ import 
org.apache.polaris.core.persistence.pagination.HasPageSize;
 import org.apache.polaris.core.persistence.pagination.Page;
 import org.apache.polaris.core.persistence.pagination.PageToken;
 import org.apache.polaris.core.policy.PolarisPolicyMappingRecord;
+import org.apache.polaris.core.policy.PolicyEntity;
 import org.apache.polaris.core.policy.PolicyType;
 import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo;
 import org.apache.polaris.core.storage.PolarisStorageIntegration;
@@ -96,7 +98,14 @@ public class JdbcBasePersistenceImpl implements 
BasePersistence, IntegrationPers
       boolean nameOrParentChanged,
       PolarisBaseEntity originalEntity) {
     try {
-      persistEntity(callCtx, entity, originalEntity, 
datasourceOperations::executeUpdate);
+      persistEntity(
+          callCtx,
+          entity,
+          originalEntity,
+          null,
+          (connection, preparedQuery) -> {
+            return datasourceOperations.executeUpdate(preparedQuery);
+          });
     } catch (SQLException e) {
       throw new RuntimeException("Error persisting entity", e);
     }
@@ -109,7 +118,7 @@ public class JdbcBasePersistenceImpl implements 
BasePersistence, IntegrationPers
       List<PolarisBaseEntity> originalEntities) {
     try {
       datasourceOperations.runWithinTransaction(
-          statement -> {
+          connection -> {
             for (int i = 0; i < entities.size(); i++) {
               PolarisBaseEntity entity = entities.get(i);
               PolarisBaseEntity originalEntity =
@@ -125,7 +134,8 @@ public class JdbcBasePersistenceImpl implements 
BasePersistence, IntegrationPers
                 // already been updated after the creation.
                 continue;
               }
-              persistEntity(callCtx, entity, originalEntity, 
statement::executeUpdate);
+              persistEntity(
+                  callCtx, entity, originalEntity, connection, 
datasourceOperations::execute);
             }
             return true;
           });
@@ -141,12 +151,18 @@ public class JdbcBasePersistenceImpl implements 
BasePersistence, IntegrationPers
       @Nonnull PolarisCallContext callCtx,
       @Nonnull PolarisBaseEntity entity,
       PolarisBaseEntity originalEntity,
+      Connection connection,
       QueryAction queryAction)
       throws SQLException {
     ModelEntity modelEntity = ModelEntity.fromEntity(entity);
     if (originalEntity == null) {
       try {
-        queryAction.apply(generateInsertQuery(modelEntity, realmId));
+        List<Object> values =
+            
modelEntity.toMap(datasourceOperations.getDatabaseType()).values().stream().toList();
+        queryAction.apply(
+            connection,
+            QueryGenerator.generateInsertQuery(
+                ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, values, 
realmId));
       } catch (SQLException e) {
         if (datasourceOperations.isConstraintViolation(e)) {
           PolarisBaseEntity existingEntity =
@@ -174,7 +190,13 @@ public class JdbcBasePersistenceImpl implements 
BasePersistence, IntegrationPers
               "realm_id",
               realmId);
       try {
-        int rowsUpdated = queryAction.apply(generateUpdateQuery(modelEntity, 
params));
+        List<Object> values =
+            
modelEntity.toMap(datasourceOperations.getDatabaseType()).values().stream().toList();
+        int rowsUpdated =
+            queryAction.apply(
+                connection,
+                QueryGenerator.generateUpdateQuery(
+                    ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, values, 
params));
         if (rowsUpdated == 0) {
           throw new RetryOnConcurrencyException(
               "Entity '%s' id '%s' concurrently modified; expected version %s",
@@ -191,9 +213,12 @@ public class JdbcBasePersistenceImpl implements 
BasePersistence, IntegrationPers
   public void writeToGrantRecords(
       @Nonnull PolarisCallContext callCtx, @Nonnull PolarisGrantRecord 
grantRec) {
     ModelGrantRecord modelGrantRecord = 
ModelGrantRecord.fromGrantRecord(grantRec);
-    String query = generateInsertQuery(modelGrantRecord, realmId);
     try {
-      datasourceOperations.executeUpdate(query);
+      List<Object> values =
+          
modelGrantRecord.toMap(datasourceOperations.getDatabaseType()).values().stream().toList();
+      datasourceOperations.executeUpdate(
+          QueryGenerator.generateInsertQuery(
+              ModelGrantRecord.ALL_COLUMNS, ModelGrantRecord.TABLE_NAME, 
values, realmId));
     } catch (SQLException e) {
       throw new RuntimeException(
           String.format("Failed to write to grant records due to %s", 
e.getMessage()), e);
@@ -212,7 +237,9 @@ public class JdbcBasePersistenceImpl implements 
BasePersistence, IntegrationPers
             "realm_id",
             realmId);
     try {
-      
datasourceOperations.executeUpdate(generateDeleteQuery(ModelEntity.class, 
params));
+      datasourceOperations.executeUpdate(
+          QueryGenerator.generateDeleteQuery(
+              ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, params));
     } catch (SQLException e) {
       throw new RuntimeException(
           String.format("Failed to delete entity due to %s", e.getMessage()), 
e);
@@ -223,9 +250,13 @@ public class JdbcBasePersistenceImpl implements 
BasePersistence, IntegrationPers
   public void deleteFromGrantRecords(
       @Nonnull PolarisCallContext callCtx, @Nonnull PolarisGrantRecord 
grantRec) {
     ModelGrantRecord modelGrantRecord = 
ModelGrantRecord.fromGrantRecord(grantRec);
-    String query = generateDeleteQuery(modelGrantRecord, realmId);
     try {
-      datasourceOperations.executeUpdate(query);
+      Map<String, Object> whereClause =
+          modelGrantRecord.toMap(datasourceOperations.getDatabaseType());
+      whereClause.put("realm_id", realmId);
+      datasourceOperations.executeUpdate(
+          QueryGenerator.generateDeleteQuery(
+              ModelGrantRecord.ALL_COLUMNS, ModelGrantRecord.TABLE_NAME, 
whereClause));
     } catch (SQLException e) {
       throw new RuntimeException(
           String.format("Failed to delete from grant records due to %s", 
e.getMessage()), e);
@@ -235,11 +266,12 @@ public class JdbcBasePersistenceImpl implements 
BasePersistence, IntegrationPers
   @Override
   public void deleteAllEntityGrantRecords(
       @Nonnull PolarisCallContext callCtx,
-      PolarisEntityCore entity,
+      @Nonnull PolarisEntityCore entity,
       @Nonnull List<PolarisGrantRecord> grantsOnGrantee,
       @Nonnull List<PolarisGrantRecord> grantsOnSecurable) {
     try {
-      
datasourceOperations.executeUpdate(generateDeleteQueryForEntityGrantRecords(entity,
 realmId));
+      datasourceOperations.executeUpdate(
+          QueryGenerator.generateDeleteQueryForEntityGrantRecords(entity, 
realmId));
     } catch (SQLException e) {
       throw new RuntimeException(
           String.format("Failed to delete grant records due to %s", 
e.getMessage()), e);
@@ -249,13 +281,29 @@ public class JdbcBasePersistenceImpl implements 
BasePersistence, IntegrationPers
   @Override
   public void deleteAll(@Nonnull PolarisCallContext callCtx) {
     try {
+      Map<String, Object> params = Map.of("realm_id", realmId);
       datasourceOperations.runWithinTransaction(
-          statement -> {
-            statement.executeUpdate(generateDeleteAll(ModelEntity.class, 
realmId));
-            statement.executeUpdate(generateDeleteAll(ModelGrantRecord.class, 
realmId));
-            statement.executeUpdate(
-                generateDeleteAll(ModelPrincipalAuthenticationData.class, 
realmId));
-            
statement.executeUpdate(generateDeleteAll(ModelPolicyMappingRecord.class, 
realmId));
+          connection -> {
+            datasourceOperations.execute(
+                connection,
+                QueryGenerator.generateDeleteQuery(
+                    ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, params));
+            datasourceOperations.execute(
+                connection,
+                QueryGenerator.generateDeleteQuery(
+                    ModelGrantRecord.ALL_COLUMNS, ModelGrantRecord.TABLE_NAME, 
params));
+            datasourceOperations.execute(
+                connection,
+                QueryGenerator.generateDeleteQuery(
+                    ModelPrincipalAuthenticationData.ALL_COLUMNS,
+                    ModelPrincipalAuthenticationData.TABLE_NAME,
+                    params));
+            datasourceOperations.execute(
+                connection,
+                QueryGenerator.generateDeleteQuery(
+                    ModelPolicyMappingRecord.ALL_COLUMNS,
+                    ModelPolicyMappingRecord.TABLE_NAME,
+                    params));
             return true;
           });
     } catch (SQLException e) {
@@ -269,8 +317,9 @@ public class JdbcBasePersistenceImpl implements 
BasePersistence, IntegrationPers
       @Nonnull PolarisCallContext callCtx, long catalogId, long entityId, int 
typeCode) {
     Map<String, Object> params =
         Map.of("catalog_id", catalogId, "id", entityId, "type_code", typeCode, 
"realm_id", realmId);
-    String query = generateSelectQuery(new ModelEntity(), params);
-    return getPolarisBaseEntity(query);
+    return getPolarisBaseEntity(
+        QueryGenerator.generateSelectQuery(
+            ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, params));
   }
 
   @Override
@@ -292,12 +341,13 @@ public class JdbcBasePersistenceImpl implements 
BasePersistence, IntegrationPers
             name,
             "realm_id",
             realmId);
-    String query = generateSelectQuery(new ModelEntity(), params);
-    return getPolarisBaseEntity(query);
+    return getPolarisBaseEntity(
+        QueryGenerator.generateSelectQuery(
+            ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, params));
   }
 
   @Nullable
-  private PolarisBaseEntity getPolarisBaseEntity(String query) {
+  private PolarisBaseEntity getPolarisBaseEntity(QueryGenerator.PreparedQuery 
query) {
     try {
       var results = datasourceOperations.executeSelect(query, new 
ModelEntity());
       if (results.isEmpty()) {
@@ -321,7 +371,7 @@ public class JdbcBasePersistenceImpl implements 
BasePersistence, IntegrationPers
   public List<PolarisBaseEntity> lookupEntities(
       @Nonnull PolarisCallContext callCtx, List<PolarisEntityId> entityIds) {
     if (entityIds == null || entityIds.isEmpty()) return new ArrayList<>();
-    String query = generateSelectQueryWithEntityIds(realmId, entityIds);
+    PreparedQuery query = 
QueryGenerator.generateSelectQueryWithEntityIds(realmId, entityIds);
     try {
       return datasourceOperations.executeSelect(query, new ModelEntity());
     } catch (SQLException e) {
@@ -412,8 +462,10 @@ public class JdbcBasePersistenceImpl implements 
BasePersistence, IntegrationPers
 
     // Limit can't be pushed down, due to client side filtering
     // absence of transaction.
-    String query = QueryGenerator.generateSelectQuery(new ModelEntity(), 
params);
     try {
+      PreparedQuery query =
+          QueryGenerator.generateSelectQuery(
+              ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, params);
       List<PolarisBaseEntity> results = new ArrayList<>();
       datasourceOperations.executeSelectOverStream(
           query,
@@ -442,8 +494,10 @@ public class JdbcBasePersistenceImpl implements 
BasePersistence, IntegrationPers
 
     Map<String, Object> params =
         Map.of("catalog_id", catalogId, "id", entityId, "realm_id", realmId);
-    String query = QueryGenerator.generateSelectQuery(new ModelEntity(), 
params);
-    PolarisBaseEntity b = getPolarisBaseEntity(query);
+    PolarisBaseEntity b =
+        getPolarisBaseEntity(
+            QueryGenerator.generateSelectQuery(
+                ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, params));
     return b == null ? 0 : b.getGrantRecordsVersion();
   }
 
@@ -469,9 +523,12 @@ public class JdbcBasePersistenceImpl implements 
BasePersistence, IntegrationPers
             privilegeCode,
             "realm_id",
             realmId);
-    String query = generateSelectQuery(new ModelGrantRecord(), params);
     try {
-      var results = datasourceOperations.executeSelect(query, new 
ModelGrantRecord());
+      var results =
+          datasourceOperations.executeSelect(
+              QueryGenerator.generateSelectQuery(
+                  ModelGrantRecord.ALL_COLUMNS, ModelGrantRecord.TABLE_NAME, 
params),
+              new ModelGrantRecord());
       if (results.size() > 1) {
         throw new IllegalStateException(
             String.format(
@@ -498,9 +555,12 @@ public class JdbcBasePersistenceImpl implements 
BasePersistence, IntegrationPers
             securableId,
             "realm_id",
             realmId);
-    String query = generateSelectQuery(new ModelGrantRecord(), params);
     try {
-      var results = datasourceOperations.executeSelect(query, new 
ModelGrantRecord());
+      var results =
+          datasourceOperations.executeSelect(
+              QueryGenerator.generateSelectQuery(
+                  ModelGrantRecord.ALL_COLUMNS, ModelGrantRecord.TABLE_NAME, 
params),
+              new ModelGrantRecord());
       return results == null ? Collections.emptyList() : results;
     } catch (SQLException e) {
       throw new RuntimeException(
@@ -518,9 +578,12 @@ public class JdbcBasePersistenceImpl implements 
BasePersistence, IntegrationPers
     Map<String, Object> params =
         Map.of(
             "grantee_catalog_id", granteeCatalogId, "grantee_id", granteeId, 
"realm_id", realmId);
-    String query = generateSelectQuery(new ModelGrantRecord(), params);
     try {
-      var results = datasourceOperations.executeSelect(query, new 
ModelGrantRecord());
+      var results =
+          datasourceOperations.executeSelect(
+              QueryGenerator.generateSelectQuery(
+                  ModelGrantRecord.ALL_COLUMNS, ModelGrantRecord.TABLE_NAME, 
params),
+              new ModelGrantRecord());
       return results == null ? Collections.emptyList() : results;
     } catch (SQLException e) {
       throw new RuntimeException(
@@ -544,9 +607,12 @@ public class JdbcBasePersistenceImpl implements 
BasePersistence, IntegrationPers
     if (optionalEntityType != null) {
       params.put("type_code", optionalEntityType.getCode());
     }
-    String query = generateSelectQuery(new ModelEntity(), params);
     try {
-      var results = datasourceOperations.executeSelect(query, new 
ModelEntity());
+      var results =
+          datasourceOperations.executeSelect(
+              QueryGenerator.generateSelectQuery(
+                  ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, params),
+              new ModelEntity());
       return results != null && !results.isEmpty();
     } catch (SQLException e) {
       throw new RuntimeException(
@@ -561,10 +627,14 @@ public class JdbcBasePersistenceImpl implements 
BasePersistence, IntegrationPers
   public PolarisPrincipalSecrets loadPrincipalSecrets(
       @Nonnull PolarisCallContext callCtx, @Nonnull String clientId) {
     Map<String, Object> params = Map.of("principal_client_id", clientId, 
"realm_id", realmId);
-    String query = generateSelectQuery(new ModelPrincipalAuthenticationData(), 
params);
     try {
       var results =
-          datasourceOperations.executeSelect(query, new 
ModelPrincipalAuthenticationData());
+          datasourceOperations.executeSelect(
+              QueryGenerator.generateSelectQuery(
+                  ModelPrincipalAuthenticationData.ALL_COLUMNS,
+                  ModelPrincipalAuthenticationData.TABLE_NAME,
+                  params),
+              new ModelPrincipalAuthenticationData());
       return results == null || results.isEmpty() ? null : results.getFirst();
     } catch (SQLException e) {
       LOGGER.error(
@@ -598,9 +668,16 @@ public class JdbcBasePersistenceImpl implements 
BasePersistence, IntegrationPers
         
ModelPrincipalAuthenticationData.fromPrincipalAuthenticationData(principalSecrets);
 
     // write new principal secrets
-    String query = generateInsertQuery(lookupPrincipalSecrets, realmId);
     try {
-      datasourceOperations.executeUpdate(query);
+      List<Object> values =
+          
lookupPrincipalSecrets.toMap(datasourceOperations.getDatabaseType()).values().stream()
+              .toList();
+      datasourceOperations.executeUpdate(
+          QueryGenerator.generateInsertQuery(
+              ModelPrincipalAuthenticationData.ALL_COLUMNS,
+              ModelPrincipalAuthenticationData.TABLE_NAME,
+              values,
+              realmId));
     } catch (SQLException e) {
       LOGGER.error(
           "Failed to generate new principal secrets for principalId: {}, due 
to {}",
@@ -654,13 +731,19 @@ public class JdbcBasePersistenceImpl implements 
BasePersistence, IntegrationPers
     }
 
     Map<String, Object> params = Map.of("principal_client_id", clientId, 
"realm_id", realmId);
-    // write back new secrets
-    String query =
-        generateUpdateQuery(
-            
ModelPrincipalAuthenticationData.fromPrincipalAuthenticationData(principalSecrets),
-            params);
     try {
-      datasourceOperations.executeUpdate(query);
+      ModelPrincipalAuthenticationData modelPrincipalAuthenticationData =
+          
ModelPrincipalAuthenticationData.fromPrincipalAuthenticationData(principalSecrets);
+      datasourceOperations.executeUpdate(
+          QueryGenerator.generateUpdateQuery(
+              ModelPrincipalAuthenticationData.ALL_COLUMNS,
+              ModelPrincipalAuthenticationData.TABLE_NAME,
+              modelPrincipalAuthenticationData
+                  .toMap(datasourceOperations.getDatabaseType())
+                  .values()
+                  .stream()
+                  .toList(),
+              params));
     } catch (SQLException e) {
       LOGGER.error(
           "Failed to rotatePrincipalSecrets  for clientId: {}, due to {}",
@@ -680,9 +763,12 @@ public class JdbcBasePersistenceImpl implements 
BasePersistence, IntegrationPers
       @Nonnull PolarisCallContext callCtx, @Nonnull String clientId, long 
principalId) {
     Map<String, Object> params =
         Map.of("principal_client_id", clientId, "principal_id", principalId, 
"realm_id", realmId);
-    String query = generateDeleteQuery(ModelPrincipalAuthenticationData.class, 
params);
     try {
-      datasourceOperations.executeUpdate(query);
+      datasourceOperations.executeUpdate(
+          QueryGenerator.generateDeleteQuery(
+              ModelPrincipalAuthenticationData.ALL_COLUMNS,
+              ModelPrincipalAuthenticationData.TABLE_NAME,
+              params));
     } catch (SQLException e) {
       LOGGER.error(
           "Failed to delete principalSecrets for clientId: {}, due to {}",
@@ -699,17 +785,28 @@ public class JdbcBasePersistenceImpl implements 
BasePersistence, IntegrationPers
       @Nonnull PolarisCallContext callCtx, @Nonnull PolarisPolicyMappingRecord 
record) {
     try {
       datasourceOperations.runWithinTransaction(
-          statement -> {
+          connection -> {
             PolicyType policyType = 
PolicyType.fromCode(record.getPolicyTypeCode());
             Preconditions.checkArgument(
                 policyType != null, "Invalid policy type code: %s", 
record.getPolicyTypeCode());
-            String insertPolicyMappingQuery =
-                generateInsertQuery(
-                    ModelPolicyMappingRecord.fromPolicyMappingRecord(record), 
realmId);
+            ModelPolicyMappingRecord modelPolicyMappingRecord =
+                ModelPolicyMappingRecord.fromPolicyMappingRecord(record);
+            List<Object> values =
+                modelPolicyMappingRecord
+                    .toMap(datasourceOperations.getDatabaseType())
+                    .values()
+                    .stream()
+                    .toList();
+            PreparedQuery insertPolicyMappingQuery =
+                QueryGenerator.generateInsertQuery(
+                    ModelPolicyMappingRecord.ALL_COLUMNS,
+                    ModelPolicyMappingRecord.TABLE_NAME,
+                    values,
+                    realmId);
             if (policyType.isInheritable()) {
-              return handleInheritablePolicy(callCtx, record, 
insertPolicyMappingQuery, statement);
+              return handleInheritablePolicy(callCtx, record, 
insertPolicyMappingQuery, connection);
             } else {
-              statement.executeUpdate(insertPolicyMappingQuery);
+              datasourceOperations.execute(connection, 
insertPolicyMappingQuery);
             }
             return true;
           });
@@ -722,8 +819,8 @@ public class JdbcBasePersistenceImpl implements 
BasePersistence, IntegrationPers
   private boolean handleInheritablePolicy(
       @Nonnull PolarisCallContext callCtx,
       @Nonnull PolarisPolicyMappingRecord record,
-      @Nonnull String insertQuery,
-      Statement statement)
+      @Nonnull PreparedQuery insertQuery,
+      Connection connection)
       throws SQLException {
     List<PolarisPolicyMappingRecord> existingRecords =
         loadPoliciesOnTargetByType(
@@ -753,13 +850,22 @@ public class JdbcBasePersistenceImpl implements 
BasePersistence, IntegrationPers
               "realm_id",
               realmId);
       // In case of the mapping exist, update the policy mapping with the new 
parameters.
-      String updateQuery =
-          generateUpdateQuery(
-              ModelPolicyMappingRecord.fromPolicyMappingRecord(record), 
updateClause);
-      statement.executeUpdate(updateQuery);
+      ModelPolicyMappingRecord modelPolicyMappingRecord =
+          ModelPolicyMappingRecord.fromPolicyMappingRecord(record);
+      PreparedQuery updateQuery =
+          QueryGenerator.generateUpdateQuery(
+              ModelPolicyMappingRecord.ALL_COLUMNS,
+              ModelPolicyMappingRecord.TABLE_NAME,
+              modelPolicyMappingRecord
+                  .toMap(datasourceOperations.getDatabaseType())
+                  .values()
+                  .stream()
+                  .toList(),
+              updateClause);
+      datasourceOperations.execute(connection, updateQuery);
     } else {
       // record doesn't exist do an insert.
-      statement.executeUpdate(insertQuery);
+      datasourceOperations.executeUpdate(insertQuery);
     }
     return true;
   }
@@ -768,9 +874,15 @@ public class JdbcBasePersistenceImpl implements 
BasePersistence, IntegrationPers
   public void deleteFromPolicyMappingRecords(
       @Nonnull PolarisCallContext callCtx, @Nonnull PolarisPolicyMappingRecord 
record) {
     var modelPolicyMappingRecord = 
ModelPolicyMappingRecord.fromPolicyMappingRecord(record);
-    String query = generateDeleteQuery(modelPolicyMappingRecord, realmId);
     try {
-      datasourceOperations.executeUpdate(query);
+      Map<String, Object> objectMap =
+          
modelPolicyMappingRecord.toMap(datasourceOperations.getDatabaseType());
+      objectMap.put("realm_id", realmId);
+      datasourceOperations.executeUpdate(
+          QueryGenerator.generateDeleteQuery(
+              ModelPolicyMappingRecord.ALL_COLUMNS,
+              ModelPolicyMappingRecord.TABLE_NAME,
+              objectMap));
     } catch (SQLException e) {
       throw new RuntimeException(
           String.format("Failed to write to policy records due to %s", 
e.getMessage()), e);
@@ -784,8 +896,22 @@ public class JdbcBasePersistenceImpl implements 
BasePersistence, IntegrationPers
       @Nonnull List<PolarisPolicyMappingRecord> mappingOnTarget,
       @Nonnull List<PolarisPolicyMappingRecord> mappingOnPolicy) {
     try {
+      Map<String, Object> queryParams = new LinkedHashMap<>();
+      if (entity.getType() == PolarisEntityType.POLICY) {
+        PolicyEntity policyEntity = PolicyEntity.of(entity);
+        queryParams.put("policy_type_code", policyEntity.getPolicyTypeCode());
+        queryParams.put("policy_catalog_id", policyEntity.getCatalogId());
+        queryParams.put("policy_id", policyEntity.getId());
+      } else {
+        queryParams.put("target_catalog_id", entity.getCatalogId());
+        queryParams.put("target_id", entity.getId());
+      }
+      queryParams.put("realm_id", realmId);
       datasourceOperations.executeUpdate(
-          generateDeleteQueryForEntityPolicyMappingRecords(entity, realmId));
+          QueryGenerator.generateDeleteQuery(
+              ModelPolicyMappingRecord.ALL_COLUMNS,
+              ModelPolicyMappingRecord.TABLE_NAME,
+              queryParams));
     } catch (SQLException e) {
       throw new RuntimeException(
           String.format("Failed to delete policy mapping records due to %s", 
e.getMessage()), e);
@@ -815,8 +941,10 @@ public class JdbcBasePersistenceImpl implements 
BasePersistence, IntegrationPers
             policyCatalogId,
             "realm_id",
             realmId);
-    String query = generateSelectQuery(new ModelPolicyMappingRecord(), params);
-    List<PolarisPolicyMappingRecord> results = 
fetchPolicyMappingRecords(query);
+    List<PolarisPolicyMappingRecord> results =
+        fetchPolicyMappingRecords(
+            QueryGenerator.generateSelectQuery(
+                ModelPolicyMappingRecord.ALL_COLUMNS, 
ModelPolicyMappingRecord.TABLE_NAME, params));
     Preconditions.checkState(results.size() <= 1, "More than one policy 
mapping records found");
     return results.size() == 1 ? results.getFirst() : null;
   }
@@ -838,8 +966,9 @@ public class JdbcBasePersistenceImpl implements 
BasePersistence, IntegrationPers
             policyTypeCode,
             "realm_id",
             realmId);
-    String query = generateSelectQuery(new ModelPolicyMappingRecord(), params);
-    return fetchPolicyMappingRecords(query);
+    return fetchPolicyMappingRecords(
+        QueryGenerator.generateSelectQuery(
+            ModelPolicyMappingRecord.ALL_COLUMNS, 
ModelPolicyMappingRecord.TABLE_NAME, params));
   }
 
   @Nonnull
@@ -848,8 +977,9 @@ public class JdbcBasePersistenceImpl implements 
BasePersistence, IntegrationPers
       @Nonnull PolarisCallContext callCtx, long targetCatalogId, long 
targetId) {
     Map<String, Object> params =
         Map.of("target_catalog_id", targetCatalogId, "target_id", targetId, 
"realm_id", realmId);
-    String query = generateSelectQuery(new ModelPolicyMappingRecord(), params);
-    return fetchPolicyMappingRecords(query);
+    return fetchPolicyMappingRecords(
+        QueryGenerator.generateSelectQuery(
+            ModelPolicyMappingRecord.ALL_COLUMNS, 
ModelPolicyMappingRecord.TABLE_NAME, params));
   }
 
   @Nonnull
@@ -869,11 +999,13 @@ public class JdbcBasePersistenceImpl implements 
BasePersistence, IntegrationPers
             policyId,
             "realm_id",
             realmId);
-    String query = generateSelectQuery(new ModelPolicyMappingRecord(), params);
-    return fetchPolicyMappingRecords(query);
+    return fetchPolicyMappingRecords(
+        QueryGenerator.generateSelectQuery(
+            ModelPolicyMappingRecord.ALL_COLUMNS, 
ModelPolicyMappingRecord.TABLE_NAME, params));
   }
 
-  private List<PolarisPolicyMappingRecord> fetchPolicyMappingRecords(String 
query) {
+  private List<PolarisPolicyMappingRecord> fetchPolicyMappingRecords(
+      QueryGenerator.PreparedQuery query) {
     try {
       var results = datasourceOperations.executeSelect(query, new 
ModelPolicyMappingRecord());
       return results == null ? Collections.emptyList() : results;
@@ -913,6 +1045,6 @@ public class JdbcBasePersistenceImpl implements 
BasePersistence, IntegrationPers
 
   @FunctionalInterface
   private interface QueryAction {
-    Integer apply(String query) throws SQLException;
+    Integer apply(Connection connection, QueryGenerator.PreparedQuery query) 
throws SQLException;
   }
 }
diff --git 
a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcMetaStoreManagerFactory.java
 
b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcMetaStoreManagerFactory.java
index aa7eff785..f94465414 100644
--- 
a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcMetaStoreManagerFactory.java
+++ 
b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcMetaStoreManagerFactory.java
@@ -93,7 +93,13 @@ public class JdbcMetaStoreManagerFactory implements 
MetaStoreManagerFactory {
 
   private void initializeForRealm(
       RealmContext realmContext, RootCredentialsSet rootCredentialsSet, 
boolean isBootstrap) {
-    DatasourceOperations databaseOperations = 
getDatasourceOperations(isBootstrap);
+    DatabaseType databaseType;
+    try {
+      databaseType = getDatabaseType();
+    } catch (SQLException e) {
+      throw new RuntimeException(e);
+    }
+    DatasourceOperations databaseOperations = 
getDatasourceOperations(isBootstrap, databaseType);
     sessionSupplierMap.put(
         realmContext.getRealmIdentifier(),
         () ->
@@ -114,14 +120,13 @@ public class JdbcMetaStoreManagerFactory implements 
MetaStoreManagerFactory {
     }
   }
 
-  private DatasourceOperations getDatasourceOperations(boolean isBootstrap) {
+  private DatasourceOperations getDatasourceOperations(
+      boolean isBootstrap, DatabaseType databaseType) {
     DatasourceOperations databaseOperations =
-        new DatasourceOperations(dataSource.get(), 
relationalJdbcConfiguration);
+        new DatasourceOperations(dataSource.get(), databaseType, 
relationalJdbcConfiguration);
     if (isBootstrap) {
       try {
-        DatabaseType databaseType = getDatabaseType();
-        databaseOperations.executeScript(
-            String.format("%s/schema-v1.sql", databaseType.getDisplayName()));
+        databaseOperations.executeScript(databaseType.getInitScriptResource());
       } catch (SQLException e) {
         throw new RuntimeException(
             String.format("Error executing sql script: %s", e.getMessage()), 
e);
diff --git 
a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/QueryGenerator.java
 
b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/QueryGenerator.java
index 0208dd453..1ba2ae283 100644
--- 
a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/QueryGenerator.java
+++ 
b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/QueryGenerator.java
@@ -20,194 +20,190 @@ package org.apache.polaris.persistence.relational.jdbc;
 
 import com.google.common.annotations.VisibleForTesting;
 import jakarta.annotation.Nonnull;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.polaris.core.entity.PolarisBaseEntity;
+import java.util.*;
+import java.util.stream.Collectors;
 import org.apache.polaris.core.entity.PolarisEntityCore;
 import org.apache.polaris.core.entity.PolarisEntityId;
-import org.apache.polaris.core.entity.PolarisEntityType;
-import org.apache.polaris.core.policy.PolicyEntity;
-import org.apache.polaris.persistence.relational.jdbc.models.Converter;
 import org.apache.polaris.persistence.relational.jdbc.models.ModelEntity;
 import org.apache.polaris.persistence.relational.jdbc.models.ModelGrantRecord;
-import 
org.apache.polaris.persistence.relational.jdbc.models.ModelPolicyMappingRecord;
-import 
org.apache.polaris.persistence.relational.jdbc.models.ModelPrincipalAuthenticationData;
 
+/**
+ * Utility class to generate parameterized SQL queries (SELECT, INSERT, 
UPDATE, DELETE). Ensures
+ * consistent SQL generation and protects against injection by managing 
parameters separately.
+ */
 public class QueryGenerator {
 
-  public static <T> String generateSelectQuery(
-      @Nonnull Converter<T> entity, @Nonnull Map<String, Object> whereClause) {
-    return generateSelectQuery(entity, generateWhereClause(whereClause));
+  /** A container for the SQL string and the ordered parameter values. */
+  public record PreparedQuery(String sql, List<Object> parameters) {}
+
+  /** A container for the query fragment SQL string and the ordered parameter 
values. */
+  record QueryFragment(String sql, List<Object> parameters) {}
+
+  /**
+   * Generates a SELECT query with projection and filtering.
+   *
+   * @param projections List of columns to retrieve.
+   * @param tableName Target table name.
+   * @param whereClause Column-value pairs used in WHERE filtering.
+   * @return A parameterized SELECT query.
+   * @throws IllegalArgumentException if any whereClause column isn't in 
projections.
+   */
+  public static PreparedQuery generateSelectQuery(
+      @Nonnull List<String> projections,
+      @Nonnull String tableName,
+      @Nonnull Map<String, Object> whereClause) {
+    QueryFragment where = generateWhereClause(new HashSet<>(projections), 
whereClause);
+    PreparedQuery query = generateSelectQuery(projections, tableName, 
where.sql());
+    return new PreparedQuery(query.sql(), where.parameters());
   }
 
-  public static String generateDeleteQueryForEntityGrantRecords(
+  /**
+   * Builds a DELETE query to remove grant records for a given entity.
+   *
+   * @param entity The target entity (either grantee or securable).
+   * @param realmId The associated realm.
+   * @return A DELETE query removing all grants for this entity.
+   */
+  public static PreparedQuery generateDeleteQueryForEntityGrantRecords(
       @Nonnull PolarisEntityCore entity, @Nonnull String realmId) {
-    String granteeCondition =
-        String.format(
-            "grantee_id = %s AND grantee_catalog_id = %s", entity.getId(), 
entity.getCatalogId());
-    String securableCondition =
-        String.format(
-            "securable_id = %s AND securable_catalog_id = %s",
-            entity.getId(), entity.getCatalogId());
-
-    String whereClause =
-        " WHERE ("
-            + granteeCondition
-            + " OR "
-            + securableCondition
-            + ") AND realm_id = '"
-            + realmId
-            + "'";
-    return generateDeleteQuery(ModelGrantRecord.class, whereClause);
+    String where =
+        """
+             WHERE (
+                (grantee_id = ? AND grantee_catalog_id = ?) OR
+                (securable_id = ? AND securable_catalog_id = ?)
+            ) AND realm_id = ?""";
+    List<Object> params =
+        Arrays.asList(
+            entity.getId(), entity.getCatalogId(), entity.getId(), 
entity.getCatalogId(), realmId);
+    return new PreparedQuery(
+        "DELETE FROM " + 
getFullyQualifiedTableName(ModelGrantRecord.TABLE_NAME) + where, params);
   }
 
-  public static String generateDeleteQueryForEntityPolicyMappingRecords(
-      @Nonnull PolarisBaseEntity entity, @Nonnull String realmId) {
-    Map<String, Object> queryParams = new HashMap<>();
-    if (entity.getType() == PolarisEntityType.POLICY) {
-      PolicyEntity policyEntity = PolicyEntity.of(entity);
-      queryParams.put("policy_type_code", policyEntity.getPolicyTypeCode());
-      queryParams.put("policy_catalog_id", policyEntity.getCatalogId());
-      queryParams.put("policy_id", policyEntity.getId());
-    } else {
-      queryParams.put("target_catalog_id", entity.getCatalogId());
-      queryParams.put("target_id", entity.getId());
-    }
-    queryParams.put("realm_id", realmId);
-
-    return generateDeleteQuery(ModelPolicyMappingRecord.class, queryParams);
-  }
-
-  public static String generateSelectQueryWithEntityIds(
+  /**
+   * Builds a SELECT query using a list of entity ID pairs (catalog_id, id).
+   *
+   * @param realmId Realm to filter by.
+   * @param entityIds List of PolarisEntityId pairs.
+   * @return SELECT query to retrieve matching entities.
+   * @throws IllegalArgumentException if entityIds is empty.
+   */
+  public static PreparedQuery generateSelectQueryWithEntityIds(
       @Nonnull String realmId, @Nonnull List<PolarisEntityId> entityIds) {
     if (entityIds.isEmpty()) {
       throw new IllegalArgumentException("Empty entity ids");
     }
-    StringBuilder condition = new StringBuilder("(catalog_id, id) IN (");
-    for (PolarisEntityId entityId : entityIds) {
-      String in = "(" + entityId.getCatalogId() + ", " + entityId.getId() + 
")";
-      condition.append(in);
-      condition.append(",");
+    String placeholders = entityIds.stream().map(e -> "(?, 
?)").collect(Collectors.joining(", "));
+    List<Object> params = new ArrayList<>();
+    for (PolarisEntityId id : entityIds) {
+      params.add(id.getCatalogId());
+      params.add(id.getId());
     }
-    // extra , removed
-    condition.deleteCharAt(condition.length() - 1);
-    condition.append(")");
-    condition.append(" AND realm_id = '").append(realmId).append("'");
-
-    return generateSelectQuery(new ModelEntity(), " WHERE " + condition);
+    params.add(realmId);
+    String where = " WHERE (catalog_id, id) IN (" + placeholders + ") AND 
realm_id = ?";
+    return new PreparedQuery(
+        generateSelectQuery(ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, 
where).sql(), params);
   }
 
-  public static <T> String generateInsertQuery(
-      @Nonnull Converter<T> entity, @Nonnull String realmId) {
-    String tableName = getTableName(entity.getClass());
-    Map<String, Object> obj = entity.toMap();
-    List<String> columnNames = new ArrayList<>(obj.keySet());
-    List<String> values =
-        new ArrayList<>(obj.values().stream().map(val -> "'" + val.toString() 
+ "'").toList());
-    columnNames.add("realm_id");
-    values.add("'" + realmId + "'");
-
-    String columns = String.join(", ", columnNames);
-    String valuesString = String.join(", ", values);
-
-    return "INSERT INTO " + tableName + " (" + columns + ") VALUES (" + 
valuesString + ")";
+  /**
+   * Generates an INSERT query for a given table.
+   *
+   * @param allColumns Columns to insert values into.
+   * @param tableName Target table name.
+   * @param values Values for each column (must match order of columns).
+   * @param realmId Realm value to append.
+   * @return INSERT query with value bindings.
+   */
+  public static PreparedQuery generateInsertQuery(
+      @Nonnull List<String> allColumns,
+      @Nonnull String tableName,
+      List<Object> values,
+      String realmId) {
+    List<String> finalColumns = new ArrayList<>(allColumns);
+    List<Object> finalValues = new ArrayList<>(values);
+    finalColumns.add("realm_id");
+    finalValues.add(realmId);
+    String columns = String.join(", ", finalColumns);
+    String placeholders = finalColumns.stream().map(c -> 
"?").collect(Collectors.joining(", "));
+    String sql =
+        "INSERT INTO "
+            + getFullyQualifiedTableName(tableName)
+            + " ("
+            + columns
+            + ") VALUES ("
+            + placeholders
+            + ")";
+    return new PreparedQuery(sql, finalValues);
   }
 
-  public static <T> String generateUpdateQuery(
-      @Nonnull Converter<T> entity, @Nonnull Map<String, Object> whereClause) {
-    String tableName = getTableName(entity.getClass());
-    Map<String, Object> obj = entity.toMap();
-    List<String> setClauses = new ArrayList<>();
-    List<String> columnNames = new ArrayList<>(obj.keySet());
-    List<String> values = obj.values().stream().map(val -> "'" + 
val.toString() + "'").toList();
-
-    for (int i = 0; i < columnNames.size(); i++) {
-      setClauses.add(columnNames.get(i) + " = " + values.get(i)); // 
Placeholders
-    }
-
-    String setClausesString = String.join(", ", setClauses);
-
-    return "UPDATE " + tableName + " SET " + setClausesString + 
generateWhereClause(whereClause);
+  /**
+   * Builds an UPDATE query.
+   *
+   * @param allColumns Columns to update.
+   * @param tableName Target table.
+   * @param values New values (must match columns in order).
+   * @param whereClause Conditions for filtering rows to update.
+   * @return UPDATE query with parameter values.
+   */
+  public static PreparedQuery generateUpdateQuery(
+      @Nonnull List<String> allColumns,
+      @Nonnull String tableName,
+      @Nonnull List<Object> values,
+      @Nonnull Map<String, Object> whereClause) {
+    List<Object> bindingParams = new ArrayList<>(values);
+    QueryFragment where = generateWhereClause(new HashSet<>(allColumns), 
whereClause);
+    String setClause = allColumns.stream().map(c -> c + " = 
?").collect(Collectors.joining(", "));
+    String sql =
+        "UPDATE " + getFullyQualifiedTableName(tableName) + " SET " + 
setClause + where.sql();
+    bindingParams.addAll(where.parameters());
+    return new PreparedQuery(sql, bindingParams);
   }
 
-  public static String generateDeleteQuery(
-      @Nonnull Class<?> entityClass, @Nonnull Map<String, Object> whereClause) 
{
-    return generateDeleteQuery(entityClass, 
(generateWhereClause(whereClause)));
+  /**
+   * Builds a DELETE query with the given conditions.
+   *
+   * @param tableColumns List of valid table columns.
+   * @param tableName Target table.
+   * @param whereClause Column-value filters.
+   * @return DELETE query with parameter bindings.
+   */
+  public static PreparedQuery generateDeleteQuery(
+      @Nonnull List<String> tableColumns,
+      @Nonnull String tableName,
+      @Nonnull Map<String, Object> whereClause) {
+    QueryFragment where = generateWhereClause(new HashSet<>(tableColumns), 
whereClause);
+    return new PreparedQuery(
+        "DELETE FROM " + getFullyQualifiedTableName(tableName) + where.sql(), 
where.parameters());
   }
 
-  public static String generateDeleteQuery(
-      @Nonnull Class<?> entityClass, @Nonnull String whereClause) {
-    return "DELETE FROM " + getTableName(entityClass) + whereClause;
-  }
-
-  public static String generateDeleteAll(@Nonnull Class<?> entityClass, 
@Nonnull String realmId) {
-    String tableName = getTableName(entityClass);
-    return "DELETE FROM " + tableName + " WHERE 1 = 1 AND realm_id = '" + 
realmId + "'";
-  }
-
-  public static <T> String generateDeleteQuery(
-      @Nonnull Converter<T> entity, @Nonnull String realmId) {
-    String tableName = getTableName(entity.getClass());
-    Map<String, Object> objMap = entity.toMap();
-    objMap.put("realm_id", realmId);
-    String whereConditions = generateWhereClause(objMap);
-    return "DELETE FROM " + tableName + whereConditions;
-  }
-
-  @VisibleForTesting
-  public static <T> String generateSelectQuery(
-      @Nonnull Converter<T> entity, @Nonnull String filter) {
-    String tableName = getTableName(entity.getClass());
-    Map<String, Object> objectMap = entity.toMap();
-    String columns = String.join(", ", objectMap.keySet());
-    StringBuilder query =
-        new StringBuilder("SELECT ").append(columns).append(" FROM 
").append(tableName);
-    if (!filter.isEmpty()) {
-      query.append(filter);
-    }
-    return query.toString();
+  private static PreparedQuery generateSelectQuery(
+      @Nonnull List<String> columnNames, @Nonnull String tableName, @Nonnull 
String filter) {
+    String sql =
+        "SELECT "
+            + String.join(", ", columnNames)
+            + " FROM "
+            + getFullyQualifiedTableName(tableName)
+            + filter;
+    return new PreparedQuery(sql, Collections.emptyList());
   }
 
   @VisibleForTesting
-  public static String generateWhereClause(@Nonnull Map<String, Object> 
whereClause) {
-    List<String> whereConditions = new ArrayList<>();
-
-    if (!whereClause.isEmpty()) {
-      for (Map.Entry<String, Object> entry : whereClause.entrySet()) {
-        String fieldName = entry.getKey();
-        Object value = entry.getValue();
-        if (value instanceof String) {
-          whereConditions.add(fieldName + " = '" + value + "'");
-        } else {
-          whereConditions.add(fieldName + " = " + value);
-        }
+  static QueryFragment generateWhereClause(
+      @Nonnull Set<String> tableColumns, @Nonnull Map<String, Object> 
whereClause) {
+    List<String> conditions = new ArrayList<>();
+    List<Object> parameters = new ArrayList<>();
+    for (Map.Entry<String, Object> entry : whereClause.entrySet()) {
+      if (!tableColumns.contains(entry.getKey()) && 
!entry.getKey().equals("realm_id")) {
+        throw new IllegalArgumentException("Invalid query column: " + 
entry.getKey());
       }
+      conditions.add(entry.getKey() + " = ?");
+      parameters.add(entry.getValue());
     }
-
-    String whereConditionsString = String.join(" AND ", whereConditions);
-    return !whereConditionsString.isEmpty() ? (" WHERE " + 
whereConditionsString) : "";
+    String clause = conditions.isEmpty() ? "" : " WHERE " + String.join(" AND 
", conditions);
+    return new QueryFragment(clause, parameters);
   }
 
-  @VisibleForTesting
-  public static String getTableName(@Nonnull Class<?> entityClass) {
-    String tableName;
-    if (entityClass.equals(ModelEntity.class)) {
-      tableName = "ENTITIES";
-    } else if (entityClass.equals(ModelGrantRecord.class)) {
-      tableName = "GRANT_RECORDS";
-    } else if (entityClass.equals(ModelPrincipalAuthenticationData.class)) {
-      tableName = "PRINCIPAL_AUTHENTICATION_DATA";
-    } else if (entityClass.equals(ModelPolicyMappingRecord.class)) {
-      tableName = "POLICY_MAPPING_RECORD";
-    } else {
-      throw new IllegalArgumentException("Unsupported entity class: " + 
entityClass.getName());
-    }
-
-    // TODO: check if we want to make schema name configurable.
-    tableName = "POLARIS_SCHEMA." + tableName;
-
-    return tableName;
+  private static String getFullyQualifiedTableName(String tableName) {
+    // TODO: make schema name configurable.
+    return "POLARIS_SCHEMA." + tableName;
   }
 }
diff --git 
a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/Converter.java
 
b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/Converter.java
index 869a9c589..228fcd451 100644
--- 
a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/Converter.java
+++ 
b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/Converter.java
@@ -21,6 +21,8 @@ package org.apache.polaris.persistence.relational.jdbc.models;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Map;
+import org.apache.polaris.persistence.relational.jdbc.DatabaseType;
+import org.postgresql.util.PGobject;
 
 public interface Converter<T> {
   /**
@@ -36,5 +38,16 @@ public interface Converter<T> {
    * Convert a model into a Map with keys as snake case names, where as values 
as values of member
    * of model obj.
    */
-  Map<String, Object> toMap();
+  Map<String, Object> toMap(DatabaseType databaseType);
+
+  default PGobject toJsonbPGobject(String props) {
+    try {
+      PGobject jsonObject = new PGobject();
+      jsonObject.setType("jsonb");
+      jsonObject.setValue(props);
+      return jsonObject;
+    } catch (SQLException e) {
+      throw new RuntimeException(e);
+    }
+  }
 }
diff --git 
a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelEntity.java
 
b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelEntity.java
index 3c27bf05c..b847a677f 100644
--- 
a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelEntity.java
+++ 
b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelEntity.java
@@ -20,13 +20,35 @@ package 
org.apache.polaris.persistence.relational.jdbc.models;
 
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import org.apache.polaris.core.entity.PolarisBaseEntity;
 import org.apache.polaris.core.entity.PolarisEntitySubType;
 import org.apache.polaris.core.entity.PolarisEntityType;
+import org.apache.polaris.persistence.relational.jdbc.DatabaseType;
 
 public class ModelEntity implements Converter<PolarisBaseEntity> {
+  public static final String TABLE_NAME = "ENTITIES";
+
+  public static final List<String> ALL_COLUMNS =
+      List.of(
+          "id",
+          "catalog_id",
+          "parent_id",
+          "type_code",
+          "name",
+          "entity_version",
+          "sub_type_code",
+          "create_timestamp",
+          "drop_timestamp",
+          "purge_timestamp",
+          "to_purge_timestamp",
+          "last_update_timestamp",
+          "properties",
+          "internal_properties",
+          "grant_records_version");
+
   // the id of the catalog associated to that entity. use 0 if this entity is 
top-level
   // like a catalog
   private long catalogId;
@@ -164,10 +186,10 @@ public class ModelEntity implements 
Converter<PolarisBaseEntity> {
   }
 
   @Override
-  public Map<String, Object> toMap() {
-    Map<String, Object> map = new HashMap<>();
-    map.put("catalog_id", this.getCatalogId());
+  public Map<String, Object> toMap(DatabaseType databaseType) {
+    Map<String, Object> map = new LinkedHashMap<>();
     map.put("id", this.getId());
+    map.put("catalog_id", this.getCatalogId());
     map.put("parent_id", this.getParentId());
     map.put("type_code", this.getTypeCode());
     map.put("name", this.getName());
@@ -178,8 +200,13 @@ public class ModelEntity implements 
Converter<PolarisBaseEntity> {
     map.put("purge_timestamp", this.getPurgeTimestamp());
     map.put("to_purge_timestamp", this.getToPurgeTimestamp());
     map.put("last_update_timestamp", this.getLastUpdateTimestamp());
-    map.put("properties", this.getProperties());
-    map.put("internal_properties", this.getInternalProperties());
+    if (databaseType.equals(DatabaseType.POSTGRES)) {
+      map.put("properties", toJsonbPGobject(this.getProperties()));
+      map.put("internal_properties", 
toJsonbPGobject(this.getInternalProperties()));
+    } else {
+      map.put("properties", this.getProperties());
+      map.put("internal_properties", this.getInternalProperties());
+    }
     map.put("grant_records_version", this.getGrantRecordsVersion());
     return map;
   }
diff --git 
a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelGrantRecord.java
 
b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelGrantRecord.java
index 1f3fe41b3..b853dfd24 100644
--- 
a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelGrantRecord.java
+++ 
b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelGrantRecord.java
@@ -20,11 +20,23 @@ package 
org.apache.polaris.persistence.relational.jdbc.models;
 
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import org.apache.polaris.core.entity.PolarisGrantRecord;
+import org.apache.polaris.persistence.relational.jdbc.DatabaseType;
 
 public class ModelGrantRecord implements Converter<PolarisGrantRecord> {
+  public static final String TABLE_NAME = "GRANT_RECORDS";
+
+  public static final List<String> ALL_COLUMNS =
+      List.of(
+          "securable_catalog_id",
+          "securable_id",
+          "grantee_catalog_id",
+          "grantee_id",
+          "privilege_code");
+
   // id of the catalog where the securable entity resides, use 0, if this 
entity is a
   // top-level account entity.
   private long securableCatalogId;
@@ -81,8 +93,8 @@ public class ModelGrantRecord implements 
Converter<PolarisGrantRecord> {
   }
 
   @Override
-  public Map<String, Object> toMap() {
-    Map<String, Object> map = new HashMap<>();
+  public Map<String, Object> toMap(DatabaseType databaseType) {
+    Map<String, Object> map = new LinkedHashMap<>();
     map.put("securable_catalog_id", this.securableCatalogId);
     map.put("securable_id", this.securableId);
     map.put("grantee_catalog_id", this.granteeCatalogId);
diff --git 
a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelPolicyMappingRecord.java
 
b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelPolicyMappingRecord.java
index 4c0ac6786..ab4faa5d4 100644
--- 
a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelPolicyMappingRecord.java
+++ 
b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelPolicyMappingRecord.java
@@ -20,11 +20,24 @@ package 
org.apache.polaris.persistence.relational.jdbc.models;
 
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import org.apache.polaris.core.policy.PolarisPolicyMappingRecord;
+import org.apache.polaris.persistence.relational.jdbc.DatabaseType;
 
 public class ModelPolicyMappingRecord implements 
Converter<PolarisPolicyMappingRecord> {
+  public static final String TABLE_NAME = "POLICY_MAPPING_RECORD";
+
+  public static final List<String> ALL_COLUMNS =
+      List.of(
+          "target_catalog_id",
+          "target_id",
+          "policy_type_code",
+          "policy_catalog_id",
+          "policy_id",
+          "parameters");
+
   // id of the catalog where target entity resides
   private long targetCatalogId;
 
@@ -155,14 +168,18 @@ public class ModelPolicyMappingRecord implements 
Converter<PolarisPolicyMappingR
   }
 
   @Override
-  public Map<String, Object> toMap() {
-    Map<String, Object> map = new HashMap<>();
+  public Map<String, Object> toMap(DatabaseType databaseType) {
+    Map<String, Object> map = new LinkedHashMap<>();
     map.put("target_catalog_id", targetCatalogId);
     map.put("target_id", targetId);
     map.put("policy_type_code", policyTypeCode);
     map.put("policy_catalog_id", policyCatalogId);
     map.put("policy_id", policyId);
-    map.put("parameters", parameters);
+    if (databaseType.equals(DatabaseType.POSTGRES)) {
+      map.put("parameters", toJsonbPGobject(this.getParameters()));
+    } else {
+      map.put("parameters", this.getParameters());
+    }
     return map;
   }
 }
diff --git 
a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelPrincipalAuthenticationData.java
 
b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelPrincipalAuthenticationData.java
index b6a82c12f..9013d6642 100644
--- 
a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelPrincipalAuthenticationData.java
+++ 
b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelPrincipalAuthenticationData.java
@@ -20,11 +20,23 @@ package 
org.apache.polaris.persistence.relational.jdbc.models;
 
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import org.apache.polaris.core.entity.PolarisPrincipalSecrets;
+import org.apache.polaris.persistence.relational.jdbc.DatabaseType;
 
 public class ModelPrincipalAuthenticationData implements 
Converter<PolarisPrincipalSecrets> {
+  public static final String TABLE_NAME = "PRINCIPAL_AUTHENTICATION_DATA";
+
+  public static final List<String> ALL_COLUMNS =
+      List.of(
+          "principal_id",
+          "principal_client_id",
+          "main_secret_hash",
+          "secondary_secret_hash",
+          "secret_salt");
+
   // the id of the principal
   private long principalId;
 
@@ -78,8 +90,8 @@ public class ModelPrincipalAuthenticationData implements 
Converter<PolarisPrinci
   }
 
   @Override
-  public Map<String, Object> toMap() {
-    Map<String, Object> map = new HashMap<>();
+  public Map<String, Object> toMap(DatabaseType databaseType) {
+    Map<String, Object> map = new LinkedHashMap<>();
     map.put("principal_id", this.principalId);
     map.put("principal_client_id", this.principalClientId);
     map.put("main_secret_hash", this.mainSecretHash);
diff --git 
a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/AtomicMetastoreManagerWithJdbcBasePersistenceImplTest.java
 
b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/AtomicMetastoreManagerWithJdbcBasePersistenceImplTest.java
index 1df5d6d5f..3018dc0e0 100644
--- 
a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/AtomicMetastoreManagerWithJdbcBasePersistenceImplTest.java
+++ 
b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/AtomicMetastoreManagerWithJdbcBasePersistenceImplTest.java
@@ -46,7 +46,7 @@ public class 
AtomicMetastoreManagerWithJdbcBasePersistenceImplTest
   protected PolarisTestMetaStoreManager createPolarisTestMetaStoreManager() {
     PolarisDiagnostics diagServices = new PolarisDefaultDiagServiceImpl();
     DatasourceOperations datasourceOperations =
-        new DatasourceOperations(createH2DataSource(), new 
H2JdbcConfiguration());
+        new DatasourceOperations(createH2DataSource(), DatabaseType.H2, new 
H2JdbcConfiguration());
     try {
       datasourceOperations.executeScript(
           String.format("%s/schema-v1.sql", DatabaseType.H2.getDisplayName()));
diff --git 
a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/DatasourceOperationsTest.java
 
b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/DatasourceOperationsTest.java
index 5d4594e2c..bde721c3f 100644
--- 
a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/DatasourceOperationsTest.java
+++ 
b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/DatasourceOperationsTest.java
@@ -28,9 +28,10 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.sql.Connection;
+import java.sql.PreparedStatement;
 import java.sql.SQLException;
-import java.sql.Statement;
 import java.time.Instant;
+import java.util.List;
 import java.util.Optional;
 import javax.sql.DataSource;
 import 
org.apache.polaris.persistence.relational.jdbc.DatasourceOperations.Operation;
@@ -47,7 +48,7 @@ public class DatasourceOperationsTest {
 
   @Mock private Connection mockConnection;
 
-  @Mock private Statement mockStatement;
+  @Mock private PreparedStatement mockPreparedStatement;
 
   @Mock private RelationalJdbcConfiguration relationalJdbcConfiguration;
 
@@ -56,29 +57,31 @@ public class DatasourceOperationsTest {
   private DatasourceOperations datasourceOperations;
 
   @BeforeEach
-  void setUp() throws Exception {
-    datasourceOperations = new DatasourceOperations(mockDataSource, 
relationalJdbcConfiguration);
+  void setUp() {
+    datasourceOperations =
+        new DatasourceOperations(mockDataSource, DatabaseType.H2, 
relationalJdbcConfiguration);
   }
 
   @Test
   void testExecuteUpdate_success() throws Exception {
     when(mockDataSource.getConnection()).thenReturn(mockConnection);
-    when(mockConnection.createStatement()).thenReturn(mockStatement);
-    String query = "UPDATE users SET active = true";
-    when(mockStatement.executeUpdate(query)).thenReturn(1);
+    QueryGenerator.PreparedQuery query =
+        new QueryGenerator.PreparedQuery("UPDATE users SET active = ?", 
List.of());
+    
when(mockConnection.prepareStatement(query.sql())).thenReturn(mockPreparedStatement);
+    when(mockPreparedStatement.executeUpdate()).thenReturn(1);
 
     int result = datasourceOperations.executeUpdate(query);
 
     assertEquals(1, result);
-    verify(mockStatement).executeUpdate(query);
   }
 
   @Test
   void testExecuteUpdate_failure() throws Exception {
     when(mockDataSource.getConnection()).thenReturn(mockConnection);
-    when(mockConnection.createStatement()).thenReturn(mockStatement);
-    String query = "INVALID SQL";
-    when(mockStatement.executeUpdate(query)).thenThrow(new 
SQLException("demo", "42P07"));
+    QueryGenerator.PreparedQuery query = new 
QueryGenerator.PreparedQuery("INVALID SQL", List.of());
+    
when(mockConnection.prepareStatement(query.sql())).thenReturn(mockPreparedStatement);
+
+    when(mockPreparedStatement.executeUpdate()).thenThrow(new 
SQLException("demo", "42P07"));
 
     assertThrows(SQLException.class, () -> 
datasourceOperations.executeUpdate(query));
   }
@@ -86,9 +89,10 @@ public class DatasourceOperationsTest {
   @Test
   void testExecuteSelect_exception() throws Exception {
     when(mockDataSource.getConnection()).thenReturn(mockConnection);
-    when(mockConnection.createStatement()).thenReturn(mockStatement);
-    String query = "SELECT * FROM users";
-    when(mockStatement.executeQuery(query)).thenThrow(new SQLException("demo", 
"42P07"));
+    QueryGenerator.PreparedQuery query =
+        new QueryGenerator.PreparedQuery("SELECT * FROM users", List.of());
+    
when(mockConnection.prepareStatement(query.sql())).thenReturn(mockPreparedStatement);
+    when(mockPreparedStatement.executeQuery()).thenThrow(new 
SQLException("demo", "42P07"));
 
     assertThrows(
         SQLException.class, () -> datasourceOperations.executeSelect(query, 
new ModelEntity()));
@@ -97,8 +101,7 @@ public class DatasourceOperationsTest {
   @Test
   void testRunWithinTransaction_commit() throws Exception {
     when(mockDataSource.getConnection()).thenReturn(mockConnection);
-    when(mockConnection.createStatement()).thenReturn(mockStatement);
-    DatasourceOperations.TransactionCallback callback = statement -> true;
+    DatasourceOperations.TransactionCallback callback = connection -> true;
     when(mockConnection.getAutoCommit()).thenReturn(true);
     datasourceOperations.runWithinTransaction(callback);
     verify(mockConnection).setAutoCommit(true);
@@ -111,8 +114,7 @@ public class DatasourceOperationsTest {
   @Test
   void testRunWithinTransaction_rollback() throws Exception {
     when(mockDataSource.getConnection()).thenReturn(mockConnection);
-    when(mockConnection.createStatement()).thenReturn(mockStatement);
-    DatasourceOperations.TransactionCallback callback = statement -> false;
+    DatasourceOperations.TransactionCallback callback = connection -> false;
 
     datasourceOperations.runWithinTransaction(callback);
 
@@ -122,9 +124,8 @@ public class DatasourceOperationsTest {
   @Test
   void testRunWithinTransaction_exceptionTriggersRollback() throws Exception {
     when(mockDataSource.getConnection()).thenReturn(mockConnection);
-    when(mockConnection.createStatement()).thenReturn(mockStatement);
     DatasourceOperations.TransactionCallback callback =
-        statement -> {
+        connection -> {
           throw new SQLException("Boom");
         };
 
diff --git 
a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/QueryGeneratorTest.java
 
b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/QueryGeneratorTest.java
index 58688e8f9..d1b71b841 100644
--- 
a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/QueryGeneratorTest.java
+++ 
b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/QueryGeneratorTest.java
@@ -23,16 +23,10 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import org.apache.polaris.core.entity.PolarisEntityCore;
 import org.apache.polaris.core.entity.PolarisEntityId;
 import org.apache.polaris.persistence.relational.jdbc.models.ModelEntity;
-import org.apache.polaris.persistence.relational.jdbc.models.ModelGrantRecord;
-import 
org.apache.polaris.persistence.relational.jdbc.models.ModelPrincipalAuthenticationData;
 import org.junit.jupiter.api.Test;
 
 public class QueryGeneratorTest {
@@ -40,13 +34,17 @@ public class QueryGeneratorTest {
   private static final String REALM_ID = "testRealm";
 
   @Test
-  void testGenerateSelectQuery_withMapWhereClause() {
+  void testGenerateSelectQuery_withMaQueryGeneratorpWhereClause() {
     Map<String, Object> whereClause = new HashMap<>();
     whereClause.put("name", "testEntity");
     whereClause.put("entity_version", 1);
     String expectedQuery =
-        "SELECT entity_version, to_purge_timestamp, internal_properties, 
catalog_id, purge_timestamp, sub_type_code, create_timestamp, 
last_update_timestamp, parent_id, name, id, drop_timestamp, properties, 
grant_records_version, type_code FROM POLARIS_SCHEMA.ENTITIES WHERE 
entity_version = 1 AND name = 'testEntity'";
-    assertEquals(expectedQuery, QueryGenerator.generateSelectQuery(new 
ModelEntity(), whereClause));
+        "SELECT id, catalog_id, parent_id, type_code, name, entity_version, 
sub_type_code, create_timestamp, drop_timestamp, purge_timestamp, 
to_purge_timestamp, last_update_timestamp, properties, internal_properties, 
grant_records_version FROM POLARIS_SCHEMA.ENTITIES WHERE entity_version = ? AND 
name = ?";
+    assertEquals(
+        expectedQuery,
+        QueryGenerator.generateSelectQuery(
+                ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, whereClause)
+            .sql());
   }
 
   @Test
@@ -55,18 +53,22 @@ public class QueryGeneratorTest {
     when(entity.getId()).thenReturn(1L);
     when(entity.getCatalogId()).thenReturn(123L);
     String expectedQuery =
-        "DELETE FROM POLARIS_SCHEMA.GRANT_RECORDS WHERE (grantee_id = 1 AND 
grantee_catalog_id = 123 OR securable_id = 1 AND securable_catalog_id = 123) 
AND realm_id = 'testRealm'";
+        "DELETE FROM POLARIS_SCHEMA.GRANT_RECORDS WHERE (\n"
+            + "    (grantee_id = ? AND grantee_catalog_id = ?) OR\n"
+            + "    (securable_id = ? AND securable_catalog_id = ?)\n"
+            + ") AND realm_id = ?";
     assertEquals(
-        expectedQuery, 
QueryGenerator.generateDeleteQueryForEntityGrantRecords(entity, REALM_ID));
+        expectedQuery,
+        QueryGenerator.generateDeleteQueryForEntityGrantRecords(entity, 
REALM_ID).sql());
   }
 
   @Test
   void testGenerateSelectQueryWithEntityIds_singleId() {
     List<PolarisEntityId> entityIds = Collections.singletonList(new 
PolarisEntityId(123L, 1L));
     String expectedQuery =
-        "SELECT entity_version, to_purge_timestamp, internal_properties, 
catalog_id, purge_timestamp, sub_type_code, create_timestamp, 
last_update_timestamp, parent_id, name, id, drop_timestamp, properties, 
grant_records_version, type_code FROM POLARIS_SCHEMA.ENTITIES WHERE 
(catalog_id, id) IN ((123, 1)) AND realm_id = 'testRealm'";
+        "SELECT id, catalog_id, parent_id, type_code, name, entity_version, 
sub_type_code, create_timestamp, drop_timestamp, purge_timestamp, 
to_purge_timestamp, last_update_timestamp, properties, internal_properties, 
grant_records_version FROM POLARIS_SCHEMA.ENTITIES WHERE (catalog_id, id) IN 
((?, ?)) AND realm_id = ?";
     assertEquals(
-        expectedQuery, 
QueryGenerator.generateSelectQueryWithEntityIds(REALM_ID, entityIds));
+        expectedQuery, 
QueryGenerator.generateSelectQueryWithEntityIds(REALM_ID, entityIds).sql());
   }
 
   @Test
@@ -74,9 +76,9 @@ public class QueryGeneratorTest {
     List<PolarisEntityId> entityIds =
         Arrays.asList(new PolarisEntityId(123L, 1L), new PolarisEntityId(456L, 
2L));
     String expectedQuery =
-        "SELECT entity_version, to_purge_timestamp, internal_properties, 
catalog_id, purge_timestamp, sub_type_code, create_timestamp, 
last_update_timestamp, parent_id, name, id, drop_timestamp, properties, 
grant_records_version, type_code FROM POLARIS_SCHEMA.ENTITIES WHERE 
(catalog_id, id) IN ((123, 1),(456, 2)) AND realm_id = 'testRealm'";
+        "SELECT id, catalog_id, parent_id, type_code, name, entity_version, 
sub_type_code, create_timestamp, drop_timestamp, purge_timestamp, 
to_purge_timestamp, last_update_timestamp, properties, internal_properties, 
grant_records_version FROM POLARIS_SCHEMA.ENTITIES WHERE (catalog_id, id) IN 
((?, ?), (?, ?)) AND realm_id = ?";
     assertEquals(
-        expectedQuery, 
QueryGenerator.generateSelectQueryWithEntityIds(REALM_ID, entityIds));
+        expectedQuery, 
QueryGenerator.generateSelectQueryWithEntityIds(REALM_ID, entityIds).sql());
   }
 
   @Test
@@ -84,23 +86,37 @@ public class QueryGeneratorTest {
     List<PolarisEntityId> entityIds = Collections.emptyList();
     assertThrows(
         IllegalArgumentException.class,
-        () -> QueryGenerator.generateSelectQueryWithEntityIds(REALM_ID, 
entityIds));
+        () -> QueryGenerator.generateSelectQueryWithEntityIds(REALM_ID, 
entityIds).sql());
   }
 
   @Test
   void testGenerateInsertQuery_nonNullFields() {
     ModelEntity entity = 
ModelEntity.builder().name("test").entityVersion(1).build();
     String expectedQuery =
-        "INSERT INTO POLARIS_SCHEMA.ENTITIES (entity_version, 
to_purge_timestamp, internal_properties, catalog_id, purge_timestamp, 
sub_type_code, create_timestamp, last_update_timestamp, parent_id, name, id, 
drop_timestamp, properties, grant_records_version, type_code, realm_id) VALUES 
('1', '0', '{}', '0', '0', '0', '0', '0', '0', 'test', '0', '0', '{}', '0', 
'0', 'testRealm')";
-    assertEquals(expectedQuery, QueryGenerator.generateInsertQuery(entity, 
REALM_ID));
+        "INSERT INTO POLARIS_SCHEMA.ENTITIES (id, catalog_id, parent_id, 
type_code, name, entity_version, sub_type_code, create_timestamp, 
drop_timestamp, purge_timestamp, to_purge_timestamp, last_update_timestamp, 
properties, internal_properties, grant_records_version, realm_id) VALUES (?, ?, 
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+    assertEquals(
+        expectedQuery,
+        QueryGenerator.generateInsertQuery(
+                ModelEntity.ALL_COLUMNS,
+                ModelEntity.TABLE_NAME,
+                entity.toMap(DatabaseType.H2).values().stream().toList(),
+                REALM_ID)
+            .sql());
   }
 
   @Test
   void testGenerateInsertQuery_nullFields() {
     ModelEntity entity = ModelEntity.builder().name("test").build();
     String expectedQuery =
-        "INSERT INTO POLARIS_SCHEMA.ENTITIES (entity_version, 
to_purge_timestamp, internal_properties, catalog_id, purge_timestamp, 
sub_type_code, create_timestamp, last_update_timestamp, parent_id, name, id, 
drop_timestamp, properties, grant_records_version, type_code, realm_id) VALUES 
('0', '0', '{}', '0', '0', '0', '0', '0', '0', 'test', '0', '0', '{}', '0', 
'0', 'testRealm')";
-    assertEquals(expectedQuery, QueryGenerator.generateInsertQuery(entity, 
REALM_ID));
+        "INSERT INTO POLARIS_SCHEMA.ENTITIES (id, catalog_id, parent_id, 
type_code, name, entity_version, sub_type_code, create_timestamp, 
drop_timestamp, purge_timestamp, to_purge_timestamp, last_update_timestamp, 
properties, internal_properties, grant_records_version, realm_id) VALUES (?, ?, 
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+    assertEquals(
+        expectedQuery,
+        QueryGenerator.generateInsertQuery(
+                ModelEntity.ALL_COLUMNS,
+                ModelEntity.TABLE_NAME,
+                entity.toMap(DatabaseType.H2).values().stream().toList(),
+                REALM_ID)
+            .sql());
   }
 
   @Test
@@ -109,8 +125,15 @@ public class QueryGeneratorTest {
     Map<String, Object> whereClause = new HashMap<>();
     whereClause.put("id", 123L);
     String expectedQuery =
-        "UPDATE POLARIS_SCHEMA.ENTITIES SET entity_version = '2', 
to_purge_timestamp = '0', internal_properties = '{}', catalog_id = '0', 
purge_timestamp = '0', sub_type_code = '0', create_timestamp = '0', 
last_update_timestamp = '0', parent_id = '0', name = 'newName', id = '0', 
drop_timestamp = '0', properties = '{}', grant_records_version = '0', type_code 
= '0' WHERE id = 123";
-    assertEquals(expectedQuery, QueryGenerator.generateUpdateQuery(entity, 
whereClause));
+        "UPDATE POLARIS_SCHEMA.ENTITIES SET id = ?, catalog_id = ?, parent_id 
= ?, type_code = ?, name = ?, entity_version = ?, sub_type_code = ?, 
create_timestamp = ?, drop_timestamp = ?, purge_timestamp = ?, 
to_purge_timestamp = ?, last_update_timestamp = ?, properties = ?, 
internal_properties = ?, grant_records_version = ? WHERE id = ?";
+    assertEquals(
+        expectedQuery,
+        QueryGenerator.generateUpdateQuery(
+                ModelEntity.ALL_COLUMNS,
+                ModelEntity.TABLE_NAME,
+                entity.toMap(DatabaseType.H2).values().stream().toList(),
+                whereClause)
+            .sql());
   }
 
   @Test
@@ -119,93 +142,71 @@ public class QueryGeneratorTest {
     Map<String, Object> whereClause = new HashMap<>();
     whereClause.put("id", 123L);
     String expectedQuery =
-        "UPDATE POLARIS_SCHEMA.ENTITIES SET entity_version = '0', 
to_purge_timestamp = '0', internal_properties = '{}', catalog_id = '0', 
purge_timestamp = '0', sub_type_code = '0', create_timestamp = '0', 
last_update_timestamp = '0', parent_id = '0', name = 'newName', id = '0', 
drop_timestamp = '0', properties = '{}', grant_records_version = '0', type_code 
= '0' WHERE id = 123";
-    assertEquals(expectedQuery, QueryGenerator.generateUpdateQuery(entity, 
whereClause));
+        "UPDATE POLARIS_SCHEMA.ENTITIES SET id = ?, catalog_id = ?, parent_id 
= ?, type_code = ?, name = ?, entity_version = ?, sub_type_code = ?, 
create_timestamp = ?, drop_timestamp = ?, purge_timestamp = ?, 
to_purge_timestamp = ?, last_update_timestamp = ?, properties = ?, 
internal_properties = ?, grant_records_version = ? WHERE id = ?";
+    assertEquals(
+        expectedQuery,
+        QueryGenerator.generateUpdateQuery(
+                ModelEntity.ALL_COLUMNS,
+                ModelEntity.TABLE_NAME,
+                entity.toMap(DatabaseType.H2).values().stream().toList(),
+                whereClause)
+            .sql());
   }
 
   @Test
   void testGenerateDeleteQuery_withMapWhereClause() {
     Map<String, Object> whereClause = new HashMap<>();
     whereClause.put("name", "oldName");
-    String expectedQuery = "DELETE FROM POLARIS_SCHEMA.ENTITIES WHERE name = 
'oldName'";
-    assertEquals(expectedQuery, 
QueryGenerator.generateDeleteQuery(ModelEntity.class, whereClause));
+    String expectedQuery = "DELETE FROM POLARIS_SCHEMA.ENTITIES WHERE name = 
?";
+    assertEquals(
+        expectedQuery,
+        QueryGenerator.generateDeleteQuery(
+                ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, whereClause)
+            .sql());
   }
 
   @Test
   void testGenerateDeleteQuery_withStringWhereClause() {
-    String whereClause = " WHERE name = 'oldName'";
-    String expectedQuery = "DELETE FROM POLARIS_SCHEMA.ENTITIES WHERE name = 
'oldName'";
-    assertEquals(expectedQuery, 
QueryGenerator.generateDeleteQuery(ModelEntity.class, whereClause));
-  }
-
-  @Test
-  void testGenerateDeleteAll() {
-    String expectedQuery =
-        "DELETE FROM POLARIS_SCHEMA.ENTITIES WHERE 1 = 1 AND realm_id = 
'testRealm'";
-    assertEquals(expectedQuery, 
QueryGenerator.generateDeleteAll(ModelEntity.class, REALM_ID));
+    String expectedQuery = "DELETE FROM POLARIS_SCHEMA.ENTITIES WHERE name = 
?";
+    assertEquals(
+        expectedQuery,
+        QueryGenerator.generateDeleteQuery(
+                ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, 
Map.of("name", "oldName"))
+            .sql());
   }
 
   @Test
   void testGenerateDeleteQuery_byObject() {
     ModelEntity entityToDelete = 
ModelEntity.builder().name("test").entityVersion(1).build();
+    Map<String, Object> objMap = entityToDelete.toMap(DatabaseType.H2);
+    objMap.put("realm_id", REALM_ID);
     String expectedQuery =
-        "DELETE FROM POLARIS_SCHEMA.ENTITIES WHERE entity_version = 1 AND 
to_purge_timestamp = 0 AND realm_id = 'testRealm' AND internal_properties = 
'{}' AND catalog_id = 0 AND purge_timestamp = 0 AND sub_type_code = 0 AND 
create_timestamp = 0 AND last_update_timestamp = 0 AND parent_id = 0 AND name = 
'test' AND id = 0 AND drop_timestamp = 0 AND properties = '{}' AND 
grant_records_version = 0 AND type_code = 0";
-    assertEquals(expectedQuery, 
QueryGenerator.generateDeleteQuery(entityToDelete, REALM_ID));
+        "DELETE FROM POLARIS_SCHEMA.ENTITIES WHERE id = ? AND catalog_id = ? 
AND parent_id = ? AND type_code = ? AND name = ? AND entity_version = ? AND 
sub_type_code = ? AND create_timestamp = ? AND drop_timestamp = ? AND 
purge_timestamp = ? AND to_purge_timestamp = ? AND last_update_timestamp = ? 
AND properties = ? AND internal_properties = ? AND grant_records_version = ? 
AND realm_id = ?";
+    assertEquals(
+        expectedQuery,
+        QueryGenerator.generateDeleteQuery(ModelEntity.ALL_COLUMNS, 
ModelEntity.TABLE_NAME, objMap)
+            .sql());
   }
 
   @Test
   void testGenerateDeleteQuery_byObject_nullValue() {
     ModelEntity entityToDelete = 
ModelEntity.builder().name("test").dropTimestamp(0L).build();
+    Map<String, Object> objMap = entityToDelete.toMap(DatabaseType.H2);
+    objMap.put("realm_id", REALM_ID);
     String expectedQuery =
-        "DELETE FROM POLARIS_SCHEMA.ENTITIES WHERE entity_version = 0 AND 
to_purge_timestamp = 0 AND realm_id = 'testRealm' AND internal_properties = 
'{}' AND catalog_id = 0 AND purge_timestamp = 0 AND sub_type_code = 0 AND 
create_timestamp = 0 AND last_update_timestamp = 0 AND parent_id = 0 AND name = 
'test' AND id = 0 AND drop_timestamp = 0 AND properties = '{}' AND 
grant_records_version = 0 AND type_code = 0";
-    assertEquals(expectedQuery, 
QueryGenerator.generateDeleteQuery(entityToDelete, REALM_ID));
-  }
-
-  @Test
-  void testGetTableName_ModelEntity() {
-    assertEquals("POLARIS_SCHEMA.ENTITIES", 
QueryGenerator.getTableName(ModelEntity.class));
-  }
-
-  @Test
-  void testGetTableName_ModelGrantRecord() {
-    assertEquals(
-        "POLARIS_SCHEMA.GRANT_RECORDS", 
QueryGenerator.getTableName(ModelGrantRecord.class));
-  }
-
-  @Test
-  void testGetTableName_ModelPrincipalAuthenticationData() {
-    assertEquals(
-        "POLARIS_SCHEMA.PRINCIPAL_AUTHENTICATION_DATA",
-        QueryGenerator.getTableName(ModelPrincipalAuthenticationData.class));
-  }
-
-  @Test
-  void testGetTableName_UnsupportedClass() {
-    class UnsupportedEntity {}
-    assertThrows(
-        IllegalArgumentException.class, () -> 
QueryGenerator.getTableName(UnsupportedEntity.class));
-  }
-
-  @Test
-  void testGenerateSelectQuery_withFilter() {
-    String filter = " WHERE name = 'testEntity'";
-    String expectedQuery =
-        "SELECT entity_version, to_purge_timestamp, internal_properties, 
catalog_id, purge_timestamp, sub_type_code, create_timestamp, 
last_update_timestamp, parent_id, name, id, drop_timestamp, properties, 
grant_records_version, type_code FROM POLARIS_SCHEMA.ENTITIES WHERE name = 
'testEntity'";
-    // Note: The private generateSelectQuery is called by the public one, so 
testing the public one
-    // with a filter is sufficient.
-    // We don't need to directly test the private one unless there's very 
specific logic not
-    // covered.
-    Map<String, Object> emptyWhereClause = Collections.emptyMap();
+        "DELETE FROM POLARIS_SCHEMA.ENTITIES WHERE id = ? AND catalog_id = ? 
AND parent_id = ? AND type_code = ? AND name = ? AND entity_version = ? AND 
sub_type_code = ? AND create_timestamp = ? AND drop_timestamp = ? AND 
purge_timestamp = ? AND to_purge_timestamp = ? AND last_update_timestamp = ? 
AND properties = ? AND internal_properties = ? AND grant_records_version = ? 
AND realm_id = ?";
     assertEquals(
         expectedQuery,
-        QueryGenerator.generateSelectQuery(new ModelEntity(), " WHERE name = 
'testEntity'"));
+        QueryGenerator.generateDeleteQuery(ModelEntity.ALL_COLUMNS, 
ModelEntity.TABLE_NAME, objMap)
+            .sql());
   }
 
   @Test
   void testGenerateWhereClause_singleCondition() {
     Map<String, Object> whereClause = new HashMap<>();
     whereClause.put("name", "test");
-    assertEquals(" WHERE name = 'test'", 
QueryGenerator.generateWhereClause(whereClause));
+    assertEquals(
+        " WHERE name = ?", QueryGenerator.generateWhereClause(Set.of("name"), 
whereClause).sql());
   }
 
   @Test
@@ -214,12 +215,13 @@ public class QueryGeneratorTest {
     whereClause.put("name", "test");
     whereClause.put("version", 1);
     assertEquals(
-        " WHERE name = 'test' AND version = 1", 
QueryGenerator.generateWhereClause(whereClause));
+        " WHERE name = ? AND version = ?",
+        QueryGenerator.generateWhereClause(Set.of("name", "version"), 
whereClause).sql());
   }
 
   @Test
   void testGenerateWhereClause_emptyMap() {
     Map<String, Object> whereClause = Collections.emptyMap();
-    assertEquals("", QueryGenerator.generateWhereClause(whereClause));
+    assertEquals("", QueryGenerator.generateWhereClause(Set.of(), 
whereClause).sql());
   }
 }


Reply via email to