[
https://issues.apache.org/jira/browse/HIVE-26267?focusedWorklogId=777958&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-777958
]
ASF GitHub Bot logged work on HIVE-26267:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 03/Jun/22 07:54
Start Date: 03/Jun/22 07:54
Worklog Time Spent: 10m
Work Description: deniskuzZ commented on code in PR #3325:
URL: https://github.com/apache/hive/pull/3325#discussion_r888707129
##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -3701,122 +3698,127 @@ public CompactionResponse compact(CompactionRequest
rqst) throws MetaException {
* compactions for any resource.
*/
handle =
getMutexAPI().acquireLock(MUTEX_KEY.CompactionScheduler.name());
- dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
- stmt = dbConn.createStatement();
- long id = generateCompactionQueueId(stmt);
-
- GetValidWriteIdsRequest request = new GetValidWriteIdsRequest(
- Collections.singletonList(getFullTableName(rqst.getDbname(),
rqst.getTablename())));
- final ValidCompactorWriteIdList tblValidWriteIds =
-
TxnUtils.createValidCompactWriteIdList(getValidWriteIds(request).getTblValidWriteIds().get(0));
- LOG.debug("ValidCompactWriteIdList: " +
tblValidWriteIds.writeToString());
+ try (Connection dbConn =
getDbConn(Connection.TRANSACTION_READ_COMMITTED)) {
+ try (Statement stmt = dbConn.createStatement()) {
+
+ long id = generateCompactionQueueId(stmt);
+
+ GetValidWriteIdsRequest request = new GetValidWriteIdsRequest(
+ Collections.singletonList(getFullTableName(rqst.getDbname(),
rqst.getTablename())));
+ final ValidCompactorWriteIdList tblValidWriteIds =
+
TxnUtils.createValidCompactWriteIdList(getValidWriteIds(request).getTblValidWriteIds().get(0));
+ LOG.debug("ValidCompactWriteIdList: " +
tblValidWriteIds.writeToString());
+
+ StringBuilder sb = new StringBuilder("SELECT \"CQ_ID\",
\"CQ_STATE\" FROM \"COMPACTION_QUEUE\" WHERE").
+ append(" (\"CQ_STATE\" IN(").
+
append(quoteChar(INITIATED_STATE)).append(",").append(quoteChar(WORKING_STATE)).
+ append(") OR (\"CQ_STATE\" =
").append(quoteChar(READY_FOR_CLEANING)).
+ append(" AND \"CQ_HIGHEST_WRITE_ID\" = ?))").
+ append(" AND \"CQ_DATABASE\"=?").
+ append(" AND \"CQ_TABLE\"=?").append(" AND ");
+ if(rqst.getPartitionname() == null) {
+ sb.append("\"CQ_PARTITION\" is null");
+ } else {
+ sb.append("\"CQ_PARTITION\"=?");
+ }
- List<String> params = new ArrayList<>();
- StringBuilder sb = new StringBuilder("SELECT \"CQ_ID\", \"CQ_STATE\"
FROM \"COMPACTION_QUEUE\" WHERE").
- append(" (\"CQ_STATE\" IN(").
-
append(quoteChar(INITIATED_STATE)).append(",").append(quoteChar(WORKING_STATE)).
- append(") OR (\"CQ_STATE\" =
").append(quoteChar(READY_FOR_CLEANING)).
- append(" AND \"CQ_HIGHEST_WRITE_ID\" = ?))").
- append(" AND \"CQ_DATABASE\"=?").
- append(" AND \"CQ_TABLE\"=?").append(" AND ");
- params.add(Long.toString(tblValidWriteIds.getHighWatermark()));
- params.add(rqst.getDbname());
- params.add(rqst.getTablename());
- if(rqst.getPartitionname() == null) {
- sb.append("\"CQ_PARTITION\" is null");
- } else {
- sb.append("\"CQ_PARTITION\"=?");
- params.add(rqst.getPartitionname());
- }
+ try (PreparedStatement pst =
dbConn.prepareStatement(sqlGenerator.addEscapeCharacters(sb.toString()))) {
+ pst.setLong(1, tblValidWriteIds.getHighWatermark());
+ pst.setString(2, rqst.getDbname());
+ pst.setString(3, rqst.getTablename());
+ if (rqst.getPartitionname() != null) {
+ pst.setString(4, rqst.getPartitionname());
+ }
+ LOG.debug("Going to execute query <" + sb + ">");
+ try (ResultSet rs = pst.executeQuery()) {
+ if(rs.next()) {
+ long enqueuedId = rs.getLong(1);
+ String state =
compactorStateToResponse(rs.getString(2).charAt(0));
+ LOG.info("Ignoring request to compact " + rqst.getDbname() +
"/" + rqst.getTablename() +
+ "/" + rqst.getPartitionname() + " since it is already "
+ quoteString(state) +
+ " with id=" + enqueuedId);
+ CompactionResponse resp = new CompactionResponse(-1,
REFUSED_RESPONSE, false);
+ resp.setErrormessage("Compaction is already scheduled with
state=" + quoteString(state) +
+ " and id=" + enqueuedId);
+ return resp;
+ }
+ }
+ }
+ List<String> params = new ArrayList<>();
+ StringBuilder buf = new StringBuilder("INSERT INTO
\"COMPACTION_QUEUE\" (\"CQ_ID\", \"CQ_DATABASE\", " +
+ "\"CQ_TABLE\", ");
+ String partName = rqst.getPartitionname();
+ if (partName != null) buf.append("\"CQ_PARTITION\", ");
+ buf.append("\"CQ_STATE\", \"CQ_TYPE\", \"CQ_ENQUEUE_TIME\"");
+ if (rqst.getProperties() != null) {
+ buf.append(", \"CQ_TBLPROPERTIES\"");
+ }
+ if (rqst.getRunas() != null) {
+ buf.append(", \"CQ_RUN_AS\"");
+ }
+ if (rqst.getInitiatorId() != null) {
+ buf.append(", \"CQ_INITIATOR_ID\"");
+ }
+ if (rqst.getInitiatorVersion() != null) {
+ buf.append(", \"CQ_INITIATOR_VERSION\"");
+ }
+ buf.append(") values (");
+ buf.append(id);
+ buf.append(", ?");
+ buf.append(", ?");
+ buf.append(", ");
+ params.add(rqst.getDbname());
+ params.add(rqst.getTablename());
+ if (partName != null) {
+ buf.append("?, '");
+ params.add(partName);
+ } else {
+ buf.append("'");
+ }
+ buf.append(INITIATED_STATE);
+ buf.append("', '");
+ buf.append(thriftCompactionType2DbType(rqst.getType()));
+ buf.append("',");
+ buf.append(getEpochFn(dbProduct));
+ if (rqst.getProperties() != null) {
+ buf.append(", ?");
+ params.add(new StringableMap(rqst.getProperties()).toString());
+ }
+ if (rqst.getRunas() != null) {
+ buf.append(", ?");
+ params.add(rqst.getRunas());
+ }
+ if (rqst.getInitiatorId() != null) {
+ buf.append(", ?");
+ params.add(rqst.getInitiatorId());
+ }
+ if (rqst.getInitiatorVersion() != null) {
+ buf.append(", ?");
+ params.add(rqst.getInitiatorVersion());
+ }
+ buf.append(")");
+ String s = buf.toString();
- pst = sqlGenerator.prepareStmtWithParameters(dbConn, sb.toString(),
params);
- LOG.debug("Going to execute query <" + sb + ">");
- ResultSet rs = pst.executeQuery();
- if(rs.next()) {
- long enqueuedId = rs.getLong(1);
- String state = compactorStateToResponse(rs.getString(2).charAt(0));
- LOG.info("Ignoring request to compact " + rqst.getDbname() + "/" +
rqst.getTablename() +
- "/" + rqst.getPartitionname() + " since it is already " +
quoteString(state) +
- " with id=" + enqueuedId);
- CompactionResponse resp = new CompactionResponse(-1,
REFUSED_RESPONSE, false);
- resp.setErrormessage("Compaction is already scheduled with state=" +
quoteString(state) +
- " and id=" + enqueuedId);
- return resp;
- }
- close(rs);
- closeStmt(pst);
- params.clear();
- StringBuilder buf = new StringBuilder("INSERT INTO
\"COMPACTION_QUEUE\" (\"CQ_ID\", \"CQ_DATABASE\", " +
- "\"CQ_TABLE\", ");
- String partName = rqst.getPartitionname();
- if (partName != null) buf.append("\"CQ_PARTITION\", ");
- buf.append("\"CQ_STATE\", \"CQ_TYPE\", \"CQ_ENQUEUE_TIME\"");
- if (rqst.getProperties() != null) {
- buf.append(", \"CQ_TBLPROPERTIES\"");
- }
- if (rqst.getRunas() != null) {
- buf.append(", \"CQ_RUN_AS\"");
- }
- if (rqst.getInitiatorId() != null) {
- buf.append(", \"CQ_INITIATOR_ID\"");
- }
- if (rqst.getInitiatorVersion() != null) {
- buf.append(", \"CQ_INITIATOR_VERSION\"");
- }
- buf.append(") values (");
- buf.append(id);
- buf.append(", ?");
- buf.append(", ?");
- buf.append(", ");
- params.add(rqst.getDbname());
- params.add(rqst.getTablename());
- if (partName != null) {
- buf.append("?, '");
- params.add(partName);
- } else {
- buf.append("'");
- }
- buf.append(INITIATED_STATE);
- buf.append("', '");
- buf.append(thriftCompactionType2DbType(rqst.getType()));
- buf.append("',");
- buf.append(getEpochFn(dbProduct));
- if (rqst.getProperties() != null) {
- buf.append(", ?");
- params.add(new StringableMap(rqst.getProperties()).toString());
- }
- if (rqst.getRunas() != null) {
- buf.append(", ?");
- params.add(rqst.getRunas());
- }
- if (rqst.getInitiatorId() != null) {
- buf.append(", ?");
- params.add(rqst.getInitiatorId());
- }
- if (rqst.getInitiatorVersion() != null) {
- buf.append(", ?");
- params.add(rqst.getInitiatorVersion());
+ try (PreparedStatement pst =
sqlGenerator.prepareStmtWithParameters(dbConn, s, params)) {
+ LOG.debug("Going to execute update <" + s + ">");
+ pst.executeUpdate();
+ }
+ LOG.debug("Going to commit");
+ dbConn.commit();
+ return new CompactionResponse(id, INITIATED_RESPONSE, true);
+ } catch (SQLException e) {
+ dbConn.rollback();
+ throw e;
+ }
}
- buf.append(")");
- String s = buf.toString();
- pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params);
- LOG.debug("Going to execute update <" + s + ">");
- pst.executeUpdate();
- LOG.debug("Going to commit");
- dbConn.commit();
- return new CompactionResponse(id, INITIATED_RESPONSE, true);
} catch (SQLException e) {
LOG.debug("Going to rollback: ", e);
- rollbackDBConn(dbConn);
checkRetryable(e, "COMPACT(" + rqst + ")");
throw new MetaException("Unable to select from transaction database " +
Review Comment:
please update the error message
Issue Time Tracking
-------------------
Worklog Id: (was: 777958)
Time Spent: 2.5h (was: 2h 20m)
> Addendum to HIVE-26107: perpared statement is not working on Postgres
> ---------------------------------------------------------------------
>
> Key: HIVE-26267
> URL: https://issues.apache.org/jira/browse/HIVE-26267
> Project: Hive
> Issue Type: Bug
> Reporter: László Végh
> Assignee: László Végh
> Priority: Major
> Labels: pull-request-available
> Time Spent: 2.5h
> Remaining Estimate: 0h
>
> The assembled prepared statement in
> {code:java}
> org.apache.hadoop.hive.metastore.txn.TxnHandler#compact{code}
> does not work for Postgres DB.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)