deniskuzZ commented on code in PR #4384:
URL: https://github.com/apache/hive/pull/4384#discussion_r1293066381


##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java:
##########
@@ -1396,245 +388,59 @@ private void 
insertAbortRetryRetentionTimeOnError(Connection dbConn, CompactionI
   @Override
   @RetrySemantics.SafeToRetry
   public void purgeCompactionHistory() throws MetaException {
-    Connection dbConn = null;
-    Statement stmt = null;
-    PreparedStatement pStmt = null;
-    ResultSet rs = null;
-    List<Long> deleteSet = new ArrayList<>();
-    RetentionCounters rc = null;
-    long timeoutThreshold = System.currentTimeMillis() -
-            MetastoreConf.getTimeVar(conf, 
ConfVars.COMPACTOR_HISTORY_RETENTION_TIMEOUT, TimeUnit.MILLISECONDS);
-    int didNotInitiateRetention = MetastoreConf.getIntVar(conf, 
ConfVars.COMPACTOR_HISTORY_RETENTION_DID_NOT_INITIATE);
-    int failedRetention = getFailedCompactionRetention();
-    int succeededRetention = MetastoreConf.getIntVar(conf, 
ConfVars.COMPACTOR_HISTORY_RETENTION_SUCCEEDED);
-    int refusedRetention = MetastoreConf.getIntVar(conf, 
ConfVars.COMPACTOR_HISTORY_RETENTION_REFUSED);
-    try {
-      try {
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, 
connPoolCompaction);
-        stmt = dbConn.createStatement();
-        /* cc_id is monotonically increasing so for any entity sorts in order 
of compaction history,
-        thus this query groups by entity and withing group sorts most recent 
first */
-        rs = stmt.executeQuery("SELECT \"CC_ID\", \"CC_DATABASE\", 
\"CC_TABLE\", \"CC_PARTITION\", "
-            + "\"CC_STATE\" , \"CC_START\", \"CC_TYPE\" "
-            + "FROM \"COMPLETED_COMPACTIONS\" ORDER BY \"CC_DATABASE\", 
\"CC_TABLE\", \"CC_PARTITION\"," +
-                "\"CC_ID\" DESC");
-        String lastCompactedEntity = null;
-        /* In each group, walk from most recent and count occurrences of each 
state type.  Once you
-        * have counted enough (for each state) to satisfy retention policy, 
delete all other
-        * instances of this status, plus timed-out entries (see this method's 
JavaDoc).
-        */
-        while(rs.next()) {
-          CompactionInfo ci = new CompactionInfo(
-              rs.getLong(1), rs.getString(2), rs.getString(3),
-              rs.getString(4), rs.getString(5).charAt(0));
-          ci.start = rs.getLong(6);
-          ci.type = 
TxnUtils.dbCompactionType2ThriftType(rs.getString(7).charAt(0));
-          if(!ci.getFullPartitionName().equals(lastCompactedEntity)) {
-            lastCompactedEntity = ci.getFullPartitionName();
-            rc = new RetentionCounters(didNotInitiateRetention, 
failedRetention, succeededRetention, refusedRetention);
-          }
-          checkForDeletion(deleteSet, ci, rc, timeoutThreshold);
-        }
-        close(rs);
-
-        if (deleteSet.size() <= 0) {
-          return;
-        }
-
-        List<String> queries = new ArrayList<>();
-
-        StringBuilder prefix = new StringBuilder();
-        StringBuilder suffix = new StringBuilder();
-
-        prefix.append("DELETE FROM \"COMPLETED_COMPACTIONS\" WHERE ");
-
-        List<String> questions = new ArrayList<>(deleteSet.size());
-        for (int  i = 0; i < deleteSet.size(); i++) {
-          questions.add("?");
-        }
-        List<Integer> counts = TxnUtils.buildQueryWithINClauseStrings(conf, 
queries, prefix, suffix, questions,
-            "\"CC_ID\"", false, false);
-        int totalCount = 0;
-        for (int i = 0; i < queries.size(); i++) {
-          String query = queries.get(i);
-          long insertCount = counts.get(i);
-          LOG.debug("Going to execute update <{}>", query);
-          pStmt = dbConn.prepareStatement(query);
-          for (int j = 0; j < insertCount; j++) {
-            pStmt.setLong(j + 1, deleteSet.get(totalCount + j));
-          }
-          totalCount += insertCount;
-          int count = pStmt.executeUpdate();
-          LOG.debug("Removed {} records from COMPLETED_COMPACTIONS", count);
-        }
-        dbConn.commit();
-      } catch (SQLException e) {
-        rollbackDBConn(dbConn);
-        checkRetryable(e, "purgeCompactionHistory()");
-        throw new MetaException(DB_FAILED_TO_CONNECT + e.getMessage());
-      } finally {
-        close(rs, stmt, dbConn);
-        closeStmt(pStmt);
-      }
-    } catch (RetryException ex) {
-      purgeCompactionHistory();
-    }
-  }
-
-  /**
-   * this ensures that the number of failed compaction entries retained is > 
than number of failed
-   * compaction threshold which prevents new compactions from being scheduled.
-   */
-  private int getFailedCompactionRetention() {
-    int failedThreshold = MetastoreConf.getIntVar(conf, 
ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD);
-    int failedRetention = MetastoreConf.getIntVar(conf, 
ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED);
-    if(failedRetention < failedThreshold) {
-      LOG.warn("Invalid configuration {}={} < {}={}.  Will use {}={}",
-          ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD.getVarname(), 
failedRetention,
-          ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED, failedRetention,
-          ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD.getVarname(), 
failedRetention);
-      failedRetention = failedThreshold;
-    }
-    return failedRetention;
+    new PurgeCompactionHistoryFunction(conf).call(dataSourceWrapper);
   }
 
   /**
    * Returns {@code true} if there already exists sufficient number of 
consecutive failures for
    * this table/partition so that no new automatic compactions will be 
scheduled.
    * User initiated compactions don't do this check.
-   *
    * Do we allow compacting whole table (when it's partitioned)?  No, though 
perhaps we should.
    * That would be a meta operations, i.e. first find all partitions for this 
table (which have
    * txn info) and schedule each compaction separately.  This avoids 
complications in this logic.
    */
   @Override
   @RetrySemantics.ReadOnly
   public boolean checkFailedCompactions(CompactionInfo ci) throws 
MetaException {
-    Connection dbConn = null;
-    PreparedStatement pStmt = null;
-    ResultSet rs = null;
-    try {
-      try {
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, 
connPoolCompaction);
-        pStmt = dbConn.prepareStatement("SELECT \"CC_STATE\", 
\"CC_ENQUEUE_TIME\" FROM \"COMPLETED_COMPACTIONS\" WHERE " +
-          "\"CC_DATABASE\" = ? AND " +
-          "\"CC_TABLE\" = ? " +
-          (ci.partName != null ? "AND \"CC_PARTITION\" = ?" : "") +
-          " AND \"CC_STATE\" != " + quoteChar(DID_NOT_INITIATE) + " ORDER BY 
\"CC_ID\" DESC");
-        pStmt.setString(1, ci.dbname);
-        pStmt.setString(2, ci.tableName);
-        if (ci.partName != null) {
-          pStmt.setString(3, ci.partName);
-        }
-        rs = pStmt.executeQuery();
-        int numFailed = 0;
-        int numTotal = 0;
-        long lastEnqueueTime = -1;
-        int failedThreshold = MetastoreConf.getIntVar(conf, 
ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD);
-        while(rs.next() && ++numTotal <= failedThreshold) {
-          long enqueueTime = rs.getLong(2);
-          if (enqueueTime > lastEnqueueTime) {
-            lastEnqueueTime = enqueueTime;
-          }
-          if(rs.getString(1).charAt(0) == FAILED_STATE) {
-            numFailed++;
-          }
-          else {
-            numFailed--;
-          }
-        }
-        // If the last attempt was too long ago, ignore the failed threshold 
and try compaction again
-        long retryTime = MetastoreConf.getTimeVar(conf,
-            ConfVars.COMPACTOR_INITIATOR_FAILED_RETRY_TIME, 
TimeUnit.MILLISECONDS);
-        boolean needsRetry = (retryTime > 0) && (lastEnqueueTime + retryTime < 
System.currentTimeMillis());
-        return (numFailed == failedThreshold) && !needsRetry;
-      }
-      catch (SQLException e) {
-        LOG.error("Unable to check for failed compactions", e);
-        LOG.debug("Going to rollback");
-        rollbackDBConn(dbConn);
-        checkRetryable(e, "checkFailedCompactions(" + ci + ")");
-        LOG.error(DB_FAILED_TO_CONNECT, e);
-        return false;//weren't able to check
-      } finally {
-        close(rs, pStmt, dbConn);
-      }
-    } catch (RetryException e) {
-      return checkFailedCompactions(ci);
-    }
+    return new CheckFailedCompactionsHandler(conf, 
ci).execute(dataSourceWrapper);
   }
 
-
   private void updateStatus(CompactionInfo ci) throws MetaException {
     String strState = CompactionState.fromSqlConst(ci.state).toString();
+
     LOG.debug("Marking as {}: CompactionInfo: {}", strState, ci);
-    try {
-      Connection dbConn = null;
-      Statement stmt = null;
-      PreparedStatement pStmt = null;
-      ResultSet rs = null;
-      try {
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, 
connPoolCompaction);
-        stmt = dbConn.createStatement();
-        pStmt = dbConn.prepareStatement("SELECT \"CQ_ID\", \"CQ_DATABASE\", 
\"CQ_TABLE\", \"CQ_PARTITION\", "
-                + "\"CQ_STATE\", \"CQ_TYPE\", \"CQ_TBLPROPERTIES\", 
\"CQ_WORKER_ID\", \"CQ_START\", \"CQ_RUN_AS\", "
-                + "\"CQ_HIGHEST_WRITE_ID\", \"CQ_META_INFO\", 
\"CQ_HADOOP_JOB_ID\", \"CQ_ERROR_MESSAGE\", "
-                + "\"CQ_ENQUEUE_TIME\", \"CQ_WORKER_VERSION\", 
\"CQ_INITIATOR_ID\", \"CQ_INITIATOR_VERSION\", "
-                + "\"CQ_RETRY_RETENTION\", \"CQ_NEXT_TXN_ID\", \"CQ_TXN_ID\", 
\"CQ_COMMIT_TIME\", \"CQ_POOL_NAME\", "
-                + "\"CQ_NUMBER_OF_BUCKETS\", \"CQ_ORDER_BY\" FROM 
\"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?");
-        pStmt.setLong(1, ci.id);
-        rs = pStmt.executeQuery();
-        if (rs.next()) {
-          //preserve errorMessage and state
-          String errorMessage = ci.errorMessage;
-          char state = ci.state;
-          ci = CompactionInfo.loadFullFromCompactionQueue(rs);
-          ci.errorMessage = errorMessage;
-          ci.state = state;
+    CompactionInfo ciActual = new GetCompactionInfoHandler(ci.id, 
false).execute(dataSourceWrapper); 
 
-          pStmt = dbConn.prepareStatement(DELETE_CQ_ENTRIES);
-          pStmt.setLong(1, ci.id);
-          LOG.debug("Going to execute update <{}>", DELETE_CQ_ENTRIES);
-          pStmt.executeUpdate();
-        }
-        else {
-          if(ci.id > 0) {
-            //the record with valid CQ_ID has disappeared - this is a sign of 
something wrong
-            throw new IllegalStateException("No record with CQ_ID=" + ci.id + 
" found in COMPACTION_QUEUE");
-          }
-        }
-        if(ci.id == 0) {
-          //The failure occurred before we even made an entry in 
COMPACTION_QUEUE
-          //generate ID so that we can make an entry in COMPLETED_COMPACTIONS
-          ci.id = generateCompactionQueueId(stmt);
-          //this is not strictly accurate, but 'type' cannot be null.
-          if(ci.type == null) {
-            ci.type = CompactionType.MINOR;
-          }
-          ci.start = getDbTime(dbConn);
-          LOG.debug("The failure occurred before we even made an entry in 
COMPACTION_QUEUE. Generated ID so that we "
-                  + "can make an entry in COMPLETED_COMPACTIONS. New Id: {}", 
ci.id);
-        }
-        close(rs, stmt, null);
-        closeStmt(pStmt);
+    long endTime = getDbTime().getTime();
+    if (ciActual != null) {
+      //preserve errorMessage and state
+      ciActual.errorMessage = ci.errorMessage;
+      ciActual.state = ci.state;
 
-        pStmt = 
dbConn.prepareStatement(TxnQueries.INSERT_INTO_COMPLETED_COMPACTION);
-        CompactionInfo.insertIntoCompletedCompactions(pStmt, ci, 
getDbTime(dbConn));
-        int updCount = pStmt.executeUpdate();
-        LOG.debug("Inserted {} entries into COMPLETED_COMPACTIONS", updCount);
-        closeStmt(pStmt);
-        dbConn.commit();
-      } catch (SQLException e) {
-        LOG.error("Failed to mark compaction request as " + strState + ", 
rolling back transaction: " + ci, e);
-        rollbackDBConn(dbConn);
-        checkRetryable(e, "updateStatus(" + ci + ")");
-      } finally {
-        close(rs, stmt, null);
-        close(null, pStmt, dbConn);
+      dataSourceWrapper.getJdbcTemplate().update("DELETE FROM 
\"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = :id",
+          new MapSqlParameterSource("id", ci.id));
+    } else {
+      if (ci.id > 0) {
+        //the record with valid CQ_ID has disappeared - this is a sign of 
something wrong
+        throw new IllegalStateException("No record with CQ_ID=" + ci.id + " 
found in COMPACTION_QUEUE");
+      }
+      ciActual = ci;
+    }
+    if (ciActual.id == 0) {
+      //The failure occurred before we even made an entry in COMPACTION_QUEUE
+      //generate ID so that we can make an entry in COMPLETED_COMPACTIONS
+      ciActual.id = generateCompactionQueueId();
+      //this is not strictly accurate, but 'type' cannot be null.
+      if (ciActual.type == null) {
+        ciActual.type = CompactionType.MINOR;
       }
-    } catch (RetryException e) {
-      updateStatus(ci);
+      //in case of creating a new entry start and end time will be the same
+      ciActual.start = endTime;
+      LOG.debug("The failure occurred before we even made an entry in 
COMPACTION_QUEUE. Generated ID so that we "
+          + "can make an entry in COMPLETED_COMPACTIONS. New Id: {}", 
ciActual.id);
     }
+
+    new InsertCompactionInfoCommand(ciActual, 
endTime).execute(dataSourceWrapper);

Review Comment:
   not consistent naming, in some places it's handler, in other  - command. 
also not consistent approach, mix of jdbcTemplace and 1 liner command



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to