This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new c0fde0fec07 HIVE-29318: PostgreSQL requires every NULL parameter to
have an explicit SQL type (#6186)
c0fde0fec07 is described below
commit c0fde0fec0729c986c62f7b13aaebaab1be985ef
Author: Denys Kuzmenko <[email protected]>
AuthorDate: Sun Nov 16 17:46:00 2025 +0100
HIVE-29318: PostgreSQL requires every NULL parameter to have an explicit
SQL type (#6186)
---
.../txn/jdbc/functions/MarkCleanedFunction.java | 110 ++++++++++++---------
1 file changed, 65 insertions(+), 45 deletions(-)
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/MarkCleanedFunction.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/MarkCleanedFunction.java
index 9f985d7c459..2790b8badb4 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/MarkCleanedFunction.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/MarkCleanedFunction.java
@@ -54,22 +54,27 @@ public Void execute(MultiDataSourceJdbcResource
jdbcResource) throws MetaExcepti
param = new MapSqlParameterSource()
.addValue("id", info.id)
.addValue("succeeded", Character.toString(SUCCEEDED_STATE),
Types.CHAR);
- jdbcTemplate.update(
- "INSERT INTO \"COMPLETED_COMPACTIONS\"(\"CC_ID\", \"CC_DATABASE\", "
- + "\"CC_TABLE\", \"CC_PARTITION\", \"CC_STATE\", \"CC_TYPE\",
\"CC_TBLPROPERTIES\", \"CC_WORKER_ID\", "
- + "\"CC_START\", \"CC_END\", \"CC_RUN_AS\",
\"CC_HIGHEST_WRITE_ID\", \"CC_META_INFO\", "
- + "\"CC_HADOOP_JOB_ID\", \"CC_ERROR_MESSAGE\",
\"CC_ENQUEUE_TIME\", "
- + "\"CC_WORKER_VERSION\", \"CC_INITIATOR_ID\",
\"CC_INITIATOR_VERSION\", "
- + "\"CC_NEXT_TXN_ID\", \"CC_TXN_ID\", \"CC_COMMIT_TIME\",
\"CC_POOL_NAME\", \"CC_NUMBER_OF_BUCKETS\","
- + "\"CC_ORDER_BY\") "
- + "SELECT \"CQ_ID\", \"CQ_DATABASE\", \"CQ_TABLE\",
\"CQ_PARTITION\", "
- + ":succeeded, \"CQ_TYPE\", \"CQ_TBLPROPERTIES\",
\"CQ_WORKER_ID\", \"CQ_START\", "
- + getEpochFn(jdbcResource.getDatabaseProduct()) + ",
\"CQ_RUN_AS\", \"CQ_HIGHEST_WRITE_ID\", \"CQ_META_INFO\", "
- + "\"CQ_HADOOP_JOB_ID\", \"CQ_ERROR_MESSAGE\",
\"CQ_ENQUEUE_TIME\", "
- + "\"CQ_WORKER_VERSION\", \"CQ_INITIATOR_ID\",
\"CQ_INITIATOR_VERSION\", "
- + "\"CQ_NEXT_TXN_ID\", \"CQ_TXN_ID\", \"CQ_COMMIT_TIME\",
\"CQ_POOL_NAME\", \"CQ_NUMBER_OF_BUCKETS\", "
- + "\"CQ_ORDER_BY\" "
- + "FROM \"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = :id", param);
+ jdbcTemplate.update("""
+ INSERT INTO "COMPLETED_COMPACTIONS"(
+ "CC_ID", "CC_DATABASE", "CC_TABLE", "CC_PARTITION",
+ "CC_STATE", "CC_TYPE", "CC_TBLPROPERTIES", "CC_WORKER_ID",
+ "CC_START", "CC_END", "CC_RUN_AS", "CC_HIGHEST_WRITE_ID",
"CC_META_INFO",
+ "CC_HADOOP_JOB_ID", "CC_ERROR_MESSAGE", "CC_ENQUEUE_TIME",
+ "CC_WORKER_VERSION", "CC_INITIATOR_ID", "CC_INITIATOR_VERSION",
+ "CC_NEXT_TXN_ID", "CC_TXN_ID", "CC_COMMIT_TIME", "CC_POOL_NAME",
"CC_NUMBER_OF_BUCKETS",
+ "CC_ORDER_BY")
+ SELECT
+ "CQ_ID", "CQ_DATABASE", "CQ_TABLE", "CQ_PARTITION",
+ :succeeded, "CQ_TYPE", "CQ_TBLPROPERTIES", "CQ_WORKER_ID",
+ "CQ_START", %s, "CQ_RUN_AS", "CQ_HIGHEST_WRITE_ID", "CQ_META_INFO",
+ "CQ_HADOOP_JOB_ID", "CQ_ERROR_MESSAGE", "CQ_ENQUEUE_TIME",
+ "CQ_WORKER_VERSION", "CQ_INITIATOR_ID", "CQ_INITIATOR_VERSION",
+ "CQ_NEXT_TXN_ID", "CQ_TXN_ID", "CQ_COMMIT_TIME", "CQ_POOL_NAME",
"CQ_NUMBER_OF_BUCKETS",
+ "CQ_ORDER_BY"
+ FROM "COMPACTION_QUEUE"
+ WHERE "CQ_ID" = :id""".formatted(
+ getEpochFn(jdbcResource.getDatabaseProduct())),
+ param);
}
/* Remove compaction queue record corresponding to the compaction which
has been successful as well as
@@ -84,22 +89,30 @@ public Void execute(MultiDataSourceJdbcResource
jdbcResource) throws MetaExcepti
// Remove entries from completed_txn_components as well, so we don't
start looking there
// again but only up to the highest write ID include in this compaction
job.
//highestWriteId will be NULL in upgrade scenarios
- String query = "DELETE FROM \"COMPLETED_TXN_COMPONENTS\" WHERE
\"CTC_DATABASE\" = :db AND \"CTC_TABLE\" = :table";
+ String deleteQuery = """
+ DELETE FROM "COMPLETED_TXN_COMPONENTS" WHERE "CTC_DATABASE" = :db
AND "CTC_TABLE" = :table
+ """;
if (info.partName != null) {
- query += " AND \"CTC_PARTITION\" = :partition";
+ deleteQuery += """
+ AND "CTC_PARTITION" = :partition
+ """;
}
if (info.highestWriteId != 0) {
- query += " AND \"CTC_WRITEID\" <= :writeId";
+ deleteQuery += """
+ AND "CTC_WRITEID" <= :writeId
+ """;
}
param = new MapSqlParameterSource()
.addValue("db", info.dbname)
.addValue("table", info.tableName)
.addValue("writeId", info.highestWriteId);
+
if (info.partName != null) {
param.addValue("partition", info.partName);
}
- LOG.debug("Going to execute update <{}>", query);
- int updCount = jdbcTemplate.update(query, param);
+ LOG.debug("Going to execute update <{}>", deleteQuery);
+ int updCount = jdbcTemplate.update(deleteQuery, param);
+
if (updCount < 1) {
LOG.warn("Expected to remove at least one row from
completed_txn_components when " +
"marking compaction entry as clean!");
@@ -119,31 +132,34 @@ private void removeTxnComponents(CompactionInfo info,
MultiDataSourceJdbcResourc
* aborted TXN_COMPONENTS above tc_writeid (and consequently about aborted
txns).
* See {@link ql.txn.compactor.Cleaner.removeFiles()}
*/
-
MapSqlParameterSource params = new MapSqlParameterSource()
.addValue("state", TxnStatus.ABORTED.getSqlConst(), Types.CHAR)
.addValue("db", info.dbname)
.addValue("table", info.tableName)
- .addValue("partition", info.partName);
+ .addValue("partition", info.partName, Types.VARCHAR);
+
+ String deleteQuery = """
+ DELETE FROM "TXN_COMPONENTS"
+ WHERE "TC_TXNID" IN (
+ SELECT "TXN_ID" FROM "TXNS" WHERE "TXN_STATE" = :state
+ )
+ AND "TC_DATABASE" = :db AND "TC_TABLE" = :table
+ AND (:partition is NULL OR "TC_PARTITION" = :partition)
+ AND "TC_WRITEID" %s
+ """;
int totalCount = 0;
if (!info.hasUncompactedAborts && info.highestWriteId != 0) {
totalCount = jdbcResource.getJdbcTemplate().update(
- "DELETE FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\" IN ( "
- + "SELECT \"TXN_ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = :state)
"
- + "AND \"TC_DATABASE\" = :db AND \"TC_TABLE\" = :table "
- + "AND (:partition is NULL OR \"TC_PARTITION\" = :partition) "
- + "AND \"TC_WRITEID\" <= :id",
+ deleteQuery.formatted("<= :id"),
params.addValue("id", info.highestWriteId));
+
} else if (CollectionUtils.isNotEmpty(info.writeIds)) {
- params.addValue("ids", new ArrayList<>(info.writeIds));
totalCount = jdbcResource.execute(new InClauseBatchCommand<>(
- "DELETE FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\" IN ( "
- + "SELECT \"TXN_ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = :state)
"
- + "AND \"TC_DATABASE\" = :db AND \"TC_TABLE\" = :table "
- + "AND (:partition is NULL OR \"TC_PARTITION\" = :partition) "
- + "AND \"TC_WRITEID\" IN (:ids)",
- params, "ids", Long::compareTo));
+ deleteQuery.formatted("IN (:ids)"),
+ params.addValue("ids", new ArrayList<>(info.writeIds)),
+ "ids",
+ Long::compareTo));
}
LOG.debug("Removed {} records from txn_components", totalCount);
}
@@ -156,20 +172,24 @@ private void
removeCompactionAndAbortRetryEntries(CompactionInfo info, NamedPara
}
MapSqlParameterSource params = new MapSqlParameterSource("id", info.id);
- String query;
- if (info.isAbortedTxnCleanup()) {
- query = "DELETE FROM \"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = :id";
- } else {
- query = "DELETE FROM \"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = :id " +
- "OR (\"CQ_DATABASE\" = :db AND \"CQ_TABLE\" = :table AND \"CQ_TYPE\"
= :type AND (:partition is NULL OR \"CQ_PARTITION\" = :partition))";
+ String deleteQuery = """
+ DELETE FROM "COMPACTION_QUEUE" WHERE "CQ_ID" = :id
+ """;
+ if (!info.isAbortedTxnCleanup()) {
+ deleteQuery += """
+ OR ("CQ_DATABASE" = :db AND "CQ_TABLE" = :table
+ AND (:partition is NULL OR "CQ_PARTITION" = :partition)
+ AND "CQ_TYPE" = :type)
+ """;
params.addValue("db", info.dbname)
.addValue("table", info.tableName)
- .addValue("type",
Character.toString(TxnStore.ABORT_TXN_CLEANUP_TYPE), Types.CHAR)
- .addValue("partition", info.partName, Types.VARCHAR);
+ .addValue("partition", info.partName, Types.VARCHAR)
+ .addValue("type",
Character.toString(TxnStore.ABORT_TXN_CLEANUP_TYPE), Types.CHAR);
}
- LOG.debug("Going to execute update <{}>", query);
- int rc = jdbcTemplate.update(query, params);
+ LOG.debug("Going to execute update <{}>", deleteQuery);
+ int rc = jdbcTemplate.update(deleteQuery, params);
+
LOG.debug("Removed {} records in COMPACTION_QUEUE", rc);
}