This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new c0fde0fec07 HIVE-29318: PostgreSQL requires every NULL parameter to 
have an explicit SQL type (#6186)
c0fde0fec07 is described below

commit c0fde0fec0729c986c62f7b13aaebaab1be985ef
Author: Denys Kuzmenko <[email protected]>
AuthorDate: Sun Nov 16 17:46:00 2025 +0100

    HIVE-29318: PostgreSQL requires every NULL parameter to have an explicit 
SQL type (#6186)
---
 .../txn/jdbc/functions/MarkCleanedFunction.java    | 110 ++++++++++++---------
 1 file changed, 65 insertions(+), 45 deletions(-)

diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/MarkCleanedFunction.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/MarkCleanedFunction.java
index 9f985d7c459..2790b8badb4 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/MarkCleanedFunction.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/MarkCleanedFunction.java
@@ -54,22 +54,27 @@ public Void execute(MultiDataSourceJdbcResource 
jdbcResource) throws MetaExcepti
       param = new MapSqlParameterSource()
           .addValue("id", info.id)
           .addValue("succeeded", Character.toString(SUCCEEDED_STATE), 
Types.CHAR);
-      jdbcTemplate.update(
-          "INSERT INTO \"COMPLETED_COMPACTIONS\"(\"CC_ID\", \"CC_DATABASE\", "
-              + "\"CC_TABLE\", \"CC_PARTITION\", \"CC_STATE\", \"CC_TYPE\", 
\"CC_TBLPROPERTIES\", \"CC_WORKER_ID\", "
-              + "\"CC_START\", \"CC_END\", \"CC_RUN_AS\", 
\"CC_HIGHEST_WRITE_ID\", \"CC_META_INFO\", "
-              + "\"CC_HADOOP_JOB_ID\", \"CC_ERROR_MESSAGE\", 
\"CC_ENQUEUE_TIME\", "
-              + "\"CC_WORKER_VERSION\", \"CC_INITIATOR_ID\", 
\"CC_INITIATOR_VERSION\", "
-              + "\"CC_NEXT_TXN_ID\", \"CC_TXN_ID\", \"CC_COMMIT_TIME\", 
\"CC_POOL_NAME\", \"CC_NUMBER_OF_BUCKETS\","
-              + "\"CC_ORDER_BY\") "
-              + "SELECT \"CQ_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", 
\"CQ_PARTITION\", "
-              + ":succeeded, \"CQ_TYPE\", \"CQ_TBLPROPERTIES\", 
\"CQ_WORKER_ID\", \"CQ_START\", "
-              + getEpochFn(jdbcResource.getDatabaseProduct()) + ", 
\"CQ_RUN_AS\", \"CQ_HIGHEST_WRITE_ID\", \"CQ_META_INFO\", "
-              + "\"CQ_HADOOP_JOB_ID\", \"CQ_ERROR_MESSAGE\", 
\"CQ_ENQUEUE_TIME\", "
-              + "\"CQ_WORKER_VERSION\", \"CQ_INITIATOR_ID\", 
\"CQ_INITIATOR_VERSION\", "
-              + "\"CQ_NEXT_TXN_ID\", \"CQ_TXN_ID\", \"CQ_COMMIT_TIME\", 
\"CQ_POOL_NAME\", \"CQ_NUMBER_OF_BUCKETS\", "
-              + "\"CQ_ORDER_BY\" "
-              + "FROM \"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = :id", param);
+      jdbcTemplate.update("""
+          INSERT INTO "COMPLETED_COMPACTIONS"(
+            "CC_ID", "CC_DATABASE", "CC_TABLE", "CC_PARTITION",
+            "CC_STATE", "CC_TYPE", "CC_TBLPROPERTIES", "CC_WORKER_ID",
+            "CC_START", "CC_END", "CC_RUN_AS", "CC_HIGHEST_WRITE_ID", 
"CC_META_INFO",
+            "CC_HADOOP_JOB_ID", "CC_ERROR_MESSAGE", "CC_ENQUEUE_TIME",
+            "CC_WORKER_VERSION", "CC_INITIATOR_ID", "CC_INITIATOR_VERSION",
+            "CC_NEXT_TXN_ID", "CC_TXN_ID", "CC_COMMIT_TIME", "CC_POOL_NAME", 
"CC_NUMBER_OF_BUCKETS",
+            "CC_ORDER_BY")
+          SELECT
+            "CQ_ID", "CQ_DATABASE", "CQ_TABLE", "CQ_PARTITION",
+            :succeeded, "CQ_TYPE", "CQ_TBLPROPERTIES", "CQ_WORKER_ID",
+            "CQ_START", %s, "CQ_RUN_AS", "CQ_HIGHEST_WRITE_ID", "CQ_META_INFO",
+            "CQ_HADOOP_JOB_ID", "CQ_ERROR_MESSAGE", "CQ_ENQUEUE_TIME",
+            "CQ_WORKER_VERSION", "CQ_INITIATOR_ID", "CQ_INITIATOR_VERSION",
+            "CQ_NEXT_TXN_ID", "CQ_TXN_ID", "CQ_COMMIT_TIME", "CQ_POOL_NAME", 
"CQ_NUMBER_OF_BUCKETS",
+            "CQ_ORDER_BY"
+          FROM "COMPACTION_QUEUE"
+          WHERE "CQ_ID" = :id""".formatted(
+              getEpochFn(jdbcResource.getDatabaseProduct())),
+          param);
     }
     
     /* Remove compaction queue record corresponding to the compaction which 
has been successful as well as
@@ -84,22 +89,30 @@ public Void execute(MultiDataSourceJdbcResource 
jdbcResource) throws MetaExcepti
       // Remove entries from completed_txn_components as well, so we don't 
start looking there
       // again but only up to the highest write ID include in this compaction 
job.
       //highestWriteId will be NULL in upgrade scenarios
-      String query = "DELETE FROM \"COMPLETED_TXN_COMPONENTS\" WHERE 
\"CTC_DATABASE\" = :db AND \"CTC_TABLE\" = :table";
+      String deleteQuery = """
+          DELETE FROM "COMPLETED_TXN_COMPONENTS" WHERE "CTC_DATABASE" = :db 
AND "CTC_TABLE" = :table
+          """;
       if (info.partName != null) {
-        query += " AND \"CTC_PARTITION\" = :partition";
+        deleteQuery += """ 
+            AND "CTC_PARTITION" = :partition
+            """;
       }
       if (info.highestWriteId != 0) {
-        query += " AND \"CTC_WRITEID\" <= :writeId";
+        deleteQuery += """
+            AND "CTC_WRITEID" <= :writeId
+            """;
       }
       param = new MapSqlParameterSource()
           .addValue("db", info.dbname)
           .addValue("table", info.tableName)
           .addValue("writeId", info.highestWriteId);
+
       if (info.partName != null) {
         param.addValue("partition", info.partName);
       }
-      LOG.debug("Going to execute update <{}>", query);
-      int updCount = jdbcTemplate.update(query, param);
+      LOG.debug("Going to execute update <{}>", deleteQuery);
+      int updCount = jdbcTemplate.update(deleteQuery, param);
+
       if (updCount < 1) {
         LOG.warn("Expected to remove at least one row from 
completed_txn_components when " +
             "marking compaction entry as clean!");
@@ -119,31 +132,34 @@ private void removeTxnComponents(CompactionInfo info, 
MultiDataSourceJdbcResourc
      * aborted TXN_COMPONENTS above tc_writeid (and consequently about aborted 
txns).
      * See {@link ql.txn.compactor.Cleaner.removeFiles()}
      */
-
     MapSqlParameterSource params = new MapSqlParameterSource()
         .addValue("state", TxnStatus.ABORTED.getSqlConst(), Types.CHAR)
         .addValue("db", info.dbname)
         .addValue("table", info.tableName)
-        .addValue("partition", info.partName);
+        .addValue("partition", info.partName, Types.VARCHAR);
+
+    String deleteQuery = """
+        DELETE FROM "TXN_COMPONENTS"
+        WHERE "TC_TXNID" IN (
+            SELECT "TXN_ID" FROM "TXNS" WHERE "TXN_STATE" = :state
+          )
+          AND "TC_DATABASE" = :db AND "TC_TABLE" = :table
+          AND (:partition is NULL OR "TC_PARTITION" = :partition)
+          AND "TC_WRITEID" %s
+        """;
 
     int totalCount = 0;
     if (!info.hasUncompactedAborts && info.highestWriteId != 0) {
       totalCount = jdbcResource.getJdbcTemplate().update(
-          "DELETE FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\" IN ( "
-              + "SELECT \"TXN_ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = :state) 
"
-              + "AND \"TC_DATABASE\" = :db AND \"TC_TABLE\" = :table "
-              + "AND (:partition is NULL OR \"TC_PARTITION\" = :partition) "
-              + "AND \"TC_WRITEID\" <= :id",
+          deleteQuery.formatted("<= :id"),
           params.addValue("id", info.highestWriteId));
+
     } else if (CollectionUtils.isNotEmpty(info.writeIds)) {
-      params.addValue("ids", new ArrayList<>(info.writeIds));
       totalCount = jdbcResource.execute(new InClauseBatchCommand<>(
-          "DELETE FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\" IN ( "
-              + "SELECT \"TXN_ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = :state) 
"
-              + "AND \"TC_DATABASE\" = :db AND \"TC_TABLE\" = :table "
-              + "AND (:partition is NULL OR \"TC_PARTITION\" = :partition) "
-              + "AND \"TC_WRITEID\" IN (:ids)", 
-          params, "ids", Long::compareTo));
+          deleteQuery.formatted("IN (:ids)"),
+          params.addValue("ids", new ArrayList<>(info.writeIds)),
+          "ids",
+          Long::compareTo));
     }
     LOG.debug("Removed {} records from txn_components", totalCount);
   }
@@ -156,20 +172,24 @@ private void 
removeCompactionAndAbortRetryEntries(CompactionInfo info, NamedPara
     }
 
     MapSqlParameterSource params = new MapSqlParameterSource("id", info.id);
-    String query;
-    if (info.isAbortedTxnCleanup()) {
-      query = "DELETE FROM \"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = :id";
-    } else {
-      query = "DELETE FROM \"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = :id " +
-          "OR (\"CQ_DATABASE\" = :db AND \"CQ_TABLE\" = :table AND \"CQ_TYPE\" 
= :type AND (:partition is NULL OR \"CQ_PARTITION\" = :partition))";
+    String deleteQuery = """
+        DELETE FROM "COMPACTION_QUEUE" WHERE "CQ_ID" = :id
+        """;
+    if (!info.isAbortedTxnCleanup()) {
+      deleteQuery += """
+          OR ("CQ_DATABASE" = :db AND "CQ_TABLE" = :table
+            AND (:partition is NULL OR "CQ_PARTITION" = :partition)
+            AND "CQ_TYPE" = :type)
+          """;
       params.addValue("db", info.dbname)
           .addValue("table", info.tableName)
-          .addValue("type", 
Character.toString(TxnStore.ABORT_TXN_CLEANUP_TYPE), Types.CHAR)
-          .addValue("partition", info.partName, Types.VARCHAR);
+          .addValue("partition", info.partName, Types.VARCHAR)
+          .addValue("type", 
Character.toString(TxnStore.ABORT_TXN_CLEANUP_TYPE), Types.CHAR);
     }
 
-    LOG.debug("Going to execute update <{}>", query);
-    int rc = jdbcTemplate.update(query, params);
+    LOG.debug("Going to execute update <{}>", deleteQuery);
+    int rc = jdbcTemplate.update(deleteQuery, params);
+
     LOG.debug("Removed {} records in COMPACTION_QUEUE", rc);
   }
 

Reply via email to