rajkrrsingh commented on a change in pull request #1085:
URL: https://github.com/apache/hive/pull/1085#discussion_r443906128



##########
File path: 
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
##########
@@ -5363,6 +5363,121 @@ private void acquireTxnLock(Statement stmt, boolean 
shared) throws SQLException,
     LOG.debug("TXN lock locked by {} in mode {}", 
quoteString(TxnHandler.hostname), shared);
   }
 
+
+  @Override
+  @RetrySemantics.Idempotent
+  public void requestCleanup(CompactionInfo ci) throws MetaException {
+    try {
+      Connection dbConn = null;
+      Statement stmt = null;
+      PreparedStatement pst = null;
+      TxnStore.MutexAPI.LockHandle handle = null;
+      try {
+        lockInternal();
+        /**
+         * MUTEX_KEY.CompactionScheduler lock ensures that there is only 1 
entry in
+         * Initiated/Working state for any resource.  This ensures that we 
don't run concurrent
+         * compactions for any resource.
+         */
+        handle = 
getMutexAPI().acquireLock(MUTEX_KEY.CompactionScheduler.name());
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        stmt = dbConn.createStatement();
+
+        long id = generateCompactionQueueId(stmt);
+
+        List<String> params = new ArrayList<>();
+        StringBuilder sb = new StringBuilder("select cq_id, cq_state from 
COMPACTION_QUEUE where").
+                append(" cq_state IN(").append(quoteChar(INITIATED_STATE)).
+                append(",").append(quoteChar(WORKING_STATE)).
+                append(") AND cq_database=?").
+                append(" AND cq_table=?").append(" AND ");
+        params.add(ci.dbname);
+        params.add(ci.tableName);
+        if(ci.partName == null) {
+          sb.append("cq_partition is null");
+        } else {
+          sb.append("cq_partition=?");
+          params.add(ci.partName);
+        }
+
+        pst = sqlGenerator.prepareStmtWithParameters(dbConn, sb.toString(), 
params);
+        LOG.debug("Going to execute query <" + sb.toString() + ">");
+        ResultSet rs = pst.executeQuery();
+        if(rs.next()) {
+          long enqueuedId = rs.getLong(1);
+          String state = compactorStateToResponse(rs.getString(2).charAt(0));
+          LOG.info("Ignoring request to clean up for " + ci.dbname + "/" + 
ci.tableName +
+                  "/" + ci.partName + " since it is already " + 
quoteString(state) +
+                  " with id=" + enqueuedId);
+        }
+        close(rs);
+        closeStmt(pst);

Review comment:
       incorporated the suggested change.
   
   

##########
File path: 
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
##########
@@ -5363,6 +5363,121 @@ private void acquireTxnLock(Statement stmt, boolean 
shared) throws SQLException,
     LOG.debug("TXN lock locked by {} in mode {}", 
quoteString(TxnHandler.hostname), shared);
   }
 
+
+  @Override
+  @RetrySemantics.Idempotent
+  public void requestCleanup(CompactionInfo ci) throws MetaException {
+    try {
+      Connection dbConn = null;
+      Statement stmt = null;
+      PreparedStatement pst = null;
+      TxnStore.MutexAPI.LockHandle handle = null;
+      try {
+        lockInternal();
+        /**
+         * MUTEX_KEY.CompactionScheduler lock ensures that there is only 1 
entry in
+         * Initiated/Working state for any resource.  This ensures that we 
don't run concurrent
+         * compactions for any resource.
+         */
+        handle = 
getMutexAPI().acquireLock(MUTEX_KEY.CompactionScheduler.name());
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        stmt = dbConn.createStatement();
+
+        long id = generateCompactionQueueId(stmt);
+
+        List<String> params = new ArrayList<>();
+        StringBuilder sb = new StringBuilder("select cq_id, cq_state from 
COMPACTION_QUEUE where").
+                append(" cq_state IN(").append(quoteChar(INITIATED_STATE)).
+                append(",").append(quoteChar(WORKING_STATE)).
+                append(") AND cq_database=?").
+                append(" AND cq_table=?").append(" AND ");
+        params.add(ci.dbname);
+        params.add(ci.tableName);
+        if(ci.partName == null) {
+          sb.append("cq_partition is null");
+        } else {
+          sb.append("cq_partition=?");
+          params.add(ci.partName);
+        }
+
+        pst = sqlGenerator.prepareStmtWithParameters(dbConn, sb.toString(), 
params);
+        LOG.debug("Going to execute query <" + sb.toString() + ">");
+        ResultSet rs = pst.executeQuery();
+        if(rs.next()) {
+          long enqueuedId = rs.getLong(1);
+          String state = compactorStateToResponse(rs.getString(2).charAt(0));
+          LOG.info("Ignoring request to clean up for " + ci.dbname + "/" + 
ci.tableName +
+                  "/" + ci.partName + " since it is already " + 
quoteString(state) +
+                  " with id=" + enqueuedId);
+        }
+        close(rs);
+        closeStmt(pst);
+        params.clear();
+        StringBuilder buf = new StringBuilder("insert into COMPACTION_QUEUE 
(cq_id, cq_database, " +
+                "cq_table, ");
+        String partName = ci.partName;

Review comment:
       incorporated the suggested change.
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org

Reply via email to