veghlaci05 commented on code in PR #3576:
URL: https://github.com/apache/hive/pull/3576#discussion_r1118536472


##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -5894,6 +5902,63 @@ private void addTxnToMinHistoryLevel(Connection dbConn, 
List<Long> txnIds, long
     }
   }
 
+  @Override
+  @RetrySemantics.SafeToRetry
+  public void addWriteIdsToMinHistory(long txnid, Map<String, Long> 
minOpenWriteIds) throws MetaException {
+    if (!useMinHistoryWriteId) {
+      return;
+    }
+    // Need to register minimum open writeId for current transactions into 
MIN_HISTORY_WRITE_ID table.
+    try {
+      Connection dbConn = null;
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);

Review Comment:
   Try with resources instead?



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -5894,6 +5902,63 @@ private void addTxnToMinHistoryLevel(Connection dbConn, 
List<Long> txnIds, long
     }
   }
 
+  @Override
+  @RetrySemantics.SafeToRetry
+  public void addWriteIdsToMinHistory(long txnid, Map<String, Long> 
minOpenWriteIds) throws MetaException {
+    if (!useMinHistoryWriteId) {
+      return;
+    }
+    // Need to register minimum open writeId for current transactions into 
MIN_HISTORY_WRITE_ID table.
+    try {
+      Connection dbConn = null;
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        try (PreparedStatement pstmt = 
dbConn.prepareStatement(MIN_HISTORY_WRITE_ID_INSERT_QUERY)) {
+          int writeId = 0;
+
+          for (Map.Entry<String, Long> validWriteId : 
minOpenWriteIds.entrySet()) {
+            String[] names = TxnUtils.getDbTableName(validWriteId.getKey());
+
+            pstmt.setLong(1, txnid);
+            pstmt.setString(2, names[0]);
+            pstmt.setString(3, names[1]);
+            pstmt.setLong(4, validWriteId.getValue());
+
+            pstmt.addBatch();
+            writeId++;
+            if (writeId % maxBatchSize == 0) {
+              LOG.debug("Executing a batch of <" + 
TXN_TO_WRITE_ID_INSERT_QUERY + "> queries. " +
+                "Batch size: " + maxBatchSize);
+              pstmt.executeBatch();
+            }
+          }
+          if (writeId % maxBatchSize != 0) {
+            LOG.debug("Executing a batch of <" + TXN_TO_WRITE_ID_INSERT_QUERY 
+ "> queries. " +
+              "Batch size: " + writeId % maxBatchSize);
+            pstmt.executeBatch();
+          }
+        }

Review Comment:
   Maybe a utility method which accepts a lambda with PreparedStatement sets? I 
see this code pattern again and again in this class. Sth like void 
executeInBatch(PreparedStatement statement, Iterable list, Consumer 
batchCreator)



##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java:
##########
@@ -140,41 +140,36 @@ public void run() {
                     
HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_DURATION_UPDATE_INTERVAL, 
TimeUnit.MILLISECONDS),
                     new 
CleanerCycleUpdater(MetricsConstants.COMPACTION_CLEANER_CYCLE_DURATION, 
startedAt));
           }
-
           long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
-
           checkInterrupt();
 
           List<CompactionInfo> readyToClean = 
txnHandler.findReadyToClean(minOpenTxnId, retentionTime);
-
           checkInterrupt();
 
           if (!readyToClean.isEmpty()) {
-            long minTxnIdSeenOpen = txnHandler.findMinTxnIdSeenOpen();
-            final long cleanerWaterMark =
-                minTxnIdSeenOpen < 0 ? minOpenTxnId : Math.min(minOpenTxnId, 
minTxnIdSeenOpen);
-
-            LOG.info("Cleaning based on min open txn id: " + cleanerWaterMark);
             List<CompletableFuture<Void>> cleanerList = new ArrayList<>();
             // For checking which compaction can be cleaned we can use the 
minOpenTxnId
             // However findReadyToClean will return all records that were 
compacted with old version of HMS
             // where the CQ_NEXT_TXN_ID is not set. For these compactions we 
need to provide minTxnIdSeenOpen
             // to the clean method, to avoid cleaning up deltas needed for 
running queries
             // when min_history_level is finally dropped, than every HMS will 
commit compaction the new way
             // and minTxnIdSeenOpen can be removed and minOpenTxnId can be 
used instead.
-            for (CompactionInfo compactionInfo : readyToClean) {
-
+            for (CompactionInfo ci : readyToClean) {
               //Check for interruption before scheduling each compactionInfo 
and return if necessary
               checkInterrupt();
-
+              
               CompletableFuture<Void> asyncJob =
                   CompletableFuture.runAsync(
-                          ThrowingRunnable.unchecked(() -> 
clean(compactionInfo, cleanerWaterMark, metricsEnabled)),
-                          cleanerExecutor)
-                      .exceptionally(t -> {
-                        LOG.error("Error clearing {}", 
compactionInfo.getFullPartitionName(), t);
-                        return null;
-                      });
+                      ThrowingRunnable.unchecked(() -> {
+                        long minOpenTxnGLB = (ci.minOpenWriteId > 0) ? 

Review Comment:
   This variable and the method argument shoul not be called "global" abymore



##########
ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner2.java:
##########
@@ -23,6 +23,6 @@
 public class TestCleaner2 extends TestCleaner {
   @Override
   boolean useHive130DeltaDirName() {
-    return false;
+    return true;

Review Comment:
   Why is this change? Is this related to the PR?



##########
ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java:
##########
@@ -395,6 +399,32 @@ private boolean allowOperationInATransaction(QueryPlan 
queryPlan) {
     return false;
   }
 
+  @Override
+  public void addWriteIdsToMinHistory(QueryPlan plan, ValidTxnWriteIdList 
txnWriteIds) {
+    if (plan.getInputs().isEmpty()) {
+      return;
+    }
+    Map<String, Long> writeIds = plan.getInputs().stream()
+      .filter(input -> !input.isDummy() && 
AcidUtils.isTransactionalTable(input.getTable()))
+      .map(input -> input.getTable().getFullyQualifiedName())
+      .collect(Collectors.toSet()).stream()

Review Comment:
   If I'm right, and this is to eliminate duplicates, than you could use the 
distinct() method. 



##########
ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java:
##########
@@ -395,6 +399,32 @@ private boolean allowOperationInATransaction(QueryPlan 
queryPlan) {
     return false;
   }
 
+  @Override
+  public void addWriteIdsToMinHistory(QueryPlan plan, ValidTxnWriteIdList 
txnWriteIds) {
+    if (plan.getInputs().isEmpty()) {
+      return;
+    }
+    Map<String, Long> writeIds = plan.getInputs().stream()
+      .filter(input -> !input.isDummy() && 
AcidUtils.isTransactionalTable(input.getTable()))
+      .map(input -> input.getTable().getFullyQualifiedName())
+      .collect(Collectors.toSet()).stream()
+      .collect(Collectors.toMap(Function.identity(), table -> 
getMinOpenWriteId(txnWriteIds, table)));
+
+    if (!writeIds.isEmpty()) {
+      try {
+        getMS().addWriteIdsToMinHistory(txnId, writeIds);
+      } catch (TException | LockException e) {
+        throw new 
RuntimeException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e);
+      }
+    }
+  }
+
+  private Long getMinOpenWriteId(ValidTxnWriteIdList txnWriteIds, String 
table) {

Review Comment:
   The only usage of this method is above, in situations like this I prefer to 
embed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org

Reply via email to