rajkrrsingh commented on a change in pull request #1085:
URL: https://github.com/apache/hive/pull/1085#discussion_r443906481



##########
File path: 
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
##########
@@ -5363,6 +5363,121 @@ private void acquireTxnLock(Statement stmt, boolean 
shared) throws SQLException,
     LOG.debug("TXN lock locked by {} in mode {}", 
quoteString(TxnHandler.hostname), shared);
   }
 
+
+  @Override
+  @RetrySemantics.Idempotent
+  public void requestCleanup(CompactionInfo ci) throws MetaException {
+    try {
+      Connection dbConn = null;
+      Statement stmt = null;
+      PreparedStatement pst = null;
+      TxnStore.MutexAPI.LockHandle handle = null;
+      try {
+        lockInternal();
+        /**
+         * MUTEX_KEY.CompactionScheduler lock ensures that there is only 1 
entry in
+         * Initiated/Working state for any resource.  This ensures that we 
don't run concurrent
+         * compactions for any resource.
+         */
+        handle = 
getMutexAPI().acquireLock(MUTEX_KEY.CompactionScheduler.name());
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        stmt = dbConn.createStatement();
+
+        long id = generateCompactionQueueId(stmt);
+
+        List<String> params = new ArrayList<>();
+        StringBuilder sb = new StringBuilder("select cq_id, cq_state from 
COMPACTION_QUEUE where").
+                append(" cq_state IN(").append(quoteChar(INITIATED_STATE)).
+                append(",").append(quoteChar(WORKING_STATE)).
+                append(") AND cq_database=?").
+                append(" AND cq_table=?").append(" AND ");
+        params.add(ci.dbname);
+        params.add(ci.tableName);
+        if(ci.partName == null) {
+          sb.append("cq_partition is null");
+        } else {
+          sb.append("cq_partition=?");
+          params.add(ci.partName);
+        }
+
+        pst = sqlGenerator.prepareStmtWithParameters(dbConn, sb.toString(), 
params);
+        LOG.debug("Going to execute query <" + sb.toString() + ">");
+        ResultSet rs = pst.executeQuery();
+        if(rs.next()) {
+          long enqueuedId = rs.getLong(1);
+          String state = compactorStateToResponse(rs.getString(2).charAt(0));
+          LOG.info("Ignoring request to clean up for " + ci.dbname + "/" + 
ci.tableName +
+                  "/" + ci.partName + " since it is already " + 
quoteString(state) +
+                  " with id=" + enqueuedId);
+        }
+        close(rs);
+        closeStmt(pst);
+        params.clear();
+        StringBuilder buf = new StringBuilder("insert into COMPACTION_QUEUE 
(cq_id, cq_database, " +
+                "cq_table, ");
+        String partName = ci.partName;
+        if (partName != null) {
+          buf.append("cq_partition, ");
+        }
+        buf.append("cq_state, cq_type");
+        if (ci.properties != null) {
+          buf.append(", cq_tblproperties");
+        }
+        if (ci.runAs != null) {
+          buf.append(", cq_run_as");
+        }
+        buf.append(") values (");
+        buf.append(id);
+        buf.append(", ?");
+        buf.append(", ?");
+        buf.append(", ");
+        params.add(ci.dbname);
+        params.add(ci.tableName);
+        if (partName != null) {
+          buf.append("?, '");
+          params.add(partName);
+        } else {
+          buf.append("'");
+        }
+        buf.append(READY_FOR_CLEANING);
+        buf.append("', '");
+        buf.append(MAJOR_TYPE);
+        buf.append("'");
+        if (ci.properties != null) {
+          buf.append(", ?");
+          params.add(ci.properties);
+        }
+        if (ci.runAs != null) {
+          buf.append(", ?");
+          params.add(ci.runAs);
+        }
+        buf.append(")");
+        String s = buf.toString();
+        pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params);
+        LOG.debug("Going to execute update <" + s + ">");
+        pst.executeUpdate();
+        LOG.debug("Going to commit");
+        dbConn.commit();
+      } catch (SQLException e) {
+        LOG.debug("Going to rollback");
+        rollbackDBConn(dbConn);
+        checkRetryable(dbConn, e, "requestCleanup(" + ci + ")");
+        throw new MetaException("Unable to select from transaction database " +
+                StringUtils.stringifyException(e));
+      } finally {
+        closeStmt(pst);
+        closeStmt(stmt);
+        closeDbConn(dbConn);

Review comment:
       incorporated the suggested change for stmt and pst. try-resource with 
dbConn make code clumsy with so many nested try-catch so I skipped it.
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org

Reply via email to