[ 
https://issues.apache.org/jira/browse/HIVE-26704?focusedWorklogId=847813&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-847813
 ]

ASF GitHub Bot logged work on HIVE-26704:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 27/Feb/23 12:26
            Start Date: 27/Feb/23 12:26
    Worklog Time Spent: 10m 
      Work Description: veghlaci05 commented on code in PR #3576:
URL: https://github.com/apache/hive/pull/3576#discussion_r1118536472


##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -5894,6 +5902,63 @@ private void addTxnToMinHistoryLevel(Connection dbConn, 
List<Long> txnIds, long
     }
   }
 
+  @Override
+  @RetrySemantics.SafeToRetry
+  public void addWriteIdsToMinHistory(long txnid, Map<String, Long> 
minOpenWriteIds) throws MetaException {
+    if (!useMinHistoryWriteId) {
+      return;
+    }
+    // Need to register minimum open writeId for current transactions into 
MIN_HISTORY_WRITE_ID table.
+    try {
+      Connection dbConn = null;
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);

Review Comment:
   Try with resources instead?



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -5894,6 +5902,63 @@ private void addTxnToMinHistoryLevel(Connection dbConn, 
List<Long> txnIds, long
     }
   }
 
+  @Override
+  @RetrySemantics.SafeToRetry
+  public void addWriteIdsToMinHistory(long txnid, Map<String, Long> 
minOpenWriteIds) throws MetaException {
+    if (!useMinHistoryWriteId) {
+      return;
+    }
+    // Need to register minimum open writeId for current transactions into 
MIN_HISTORY_WRITE_ID table.
+    try {
+      Connection dbConn = null;
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        try (PreparedStatement pstmt = 
dbConn.prepareStatement(MIN_HISTORY_WRITE_ID_INSERT_QUERY)) {
+          int writeId = 0;
+
+          for (Map.Entry<String, Long> validWriteId : 
minOpenWriteIds.entrySet()) {
+            String[] names = TxnUtils.getDbTableName(validWriteId.getKey());
+
+            pstmt.setLong(1, txnid);
+            pstmt.setString(2, names[0]);
+            pstmt.setString(3, names[1]);
+            pstmt.setLong(4, validWriteId.getValue());
+
+            pstmt.addBatch();
+            writeId++;
+            if (writeId % maxBatchSize == 0) {
+              LOG.debug("Executing a batch of <" + 
TXN_TO_WRITE_ID_INSERT_QUERY + "> queries. " +
+                "Batch size: " + maxBatchSize);
+              pstmt.executeBatch();
+            }
+          }
+          if (writeId % maxBatchSize != 0) {
+            LOG.debug("Executing a batch of <" + TXN_TO_WRITE_ID_INSERT_QUERY 
+ "> queries. " +
+              "Batch size: " + writeId % maxBatchSize);
+            pstmt.executeBatch();
+          }
+        }

Review Comment:
   Maybe a utility method which accepts a lambda with PreparedStatement sets? I 
see this code pattern again and again in this class. Sth like void 
executeInBatch(PreparedStatement statement, Iterable list, Consumer 
batchCreator)



##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java:
##########
@@ -140,41 +140,36 @@ public void run() {
                     
HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_DURATION_UPDATE_INTERVAL, 
TimeUnit.MILLISECONDS),
                     new 
CleanerCycleUpdater(MetricsConstants.COMPACTION_CLEANER_CYCLE_DURATION, 
startedAt));
           }
-
           long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
-
           checkInterrupt();
 
           List<CompactionInfo> readyToClean = 
txnHandler.findReadyToClean(minOpenTxnId, retentionTime);
-
           checkInterrupt();
 
           if (!readyToClean.isEmpty()) {
-            long minTxnIdSeenOpen = txnHandler.findMinTxnIdSeenOpen();
-            final long cleanerWaterMark =
-                minTxnIdSeenOpen < 0 ? minOpenTxnId : Math.min(minOpenTxnId, 
minTxnIdSeenOpen);
-
-            LOG.info("Cleaning based on min open txn id: " + cleanerWaterMark);
             List<CompletableFuture<Void>> cleanerList = new ArrayList<>();
             // For checking which compaction can be cleaned we can use the 
minOpenTxnId
             // However findReadyToClean will return all records that were 
compacted with old version of HMS
             // where the CQ_NEXT_TXN_ID is not set. For these compactions we 
need to provide minTxnIdSeenOpen
             // to the clean method, to avoid cleaning up deltas needed for 
running queries
             // when min_history_level is finally dropped, than every HMS will 
commit compaction the new way
             // and minTxnIdSeenOpen can be removed and minOpenTxnId can be 
used instead.
-            for (CompactionInfo compactionInfo : readyToClean) {
-
+            for (CompactionInfo ci : readyToClean) {
               //Check for interruption before scheduling each compactionInfo 
and return if necessary
               checkInterrupt();
-
+              
               CompletableFuture<Void> asyncJob =
                   CompletableFuture.runAsync(
-                          ThrowingRunnable.unchecked(() -> 
clean(compactionInfo, cleanerWaterMark, metricsEnabled)),
-                          cleanerExecutor)
-                      .exceptionally(t -> {
-                        LOG.error("Error clearing {}", 
compactionInfo.getFullPartitionName(), t);
-                        return null;
-                      });
+                      ThrowingRunnable.unchecked(() -> {
+                        long minOpenTxnGLB = (ci.minOpenWriteId > 0) ? 

Review Comment:
   This variable and the method argument shoul not be called "global" abymore



##########
ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner2.java:
##########
@@ -23,6 +23,6 @@
 public class TestCleaner2 extends TestCleaner {
   @Override
   boolean useHive130DeltaDirName() {
-    return false;
+    return true;

Review Comment:
   Why is this change? Is this related to the PR?



##########
ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java:
##########
@@ -395,6 +399,32 @@ private boolean allowOperationInATransaction(QueryPlan 
queryPlan) {
     return false;
   }
 
+  @Override
+  public void addWriteIdsToMinHistory(QueryPlan plan, ValidTxnWriteIdList 
txnWriteIds) {
+    if (plan.getInputs().isEmpty()) {
+      return;
+    }
+    Map<String, Long> writeIds = plan.getInputs().stream()
+      .filter(input -> !input.isDummy() && 
AcidUtils.isTransactionalTable(input.getTable()))
+      .map(input -> input.getTable().getFullyQualifiedName())
+      .collect(Collectors.toSet()).stream()

Review Comment:
   If I'm right, and this is to eliminate duplicates, than you could use the 
distinct() method. 



##########
ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java:
##########
@@ -395,6 +399,32 @@ private boolean allowOperationInATransaction(QueryPlan 
queryPlan) {
     return false;
   }
 
+  @Override
+  public void addWriteIdsToMinHistory(QueryPlan plan, ValidTxnWriteIdList 
txnWriteIds) {
+    if (plan.getInputs().isEmpty()) {
+      return;
+    }
+    Map<String, Long> writeIds = plan.getInputs().stream()
+      .filter(input -> !input.isDummy() && 
AcidUtils.isTransactionalTable(input.getTable()))
+      .map(input -> input.getTable().getFullyQualifiedName())
+      .collect(Collectors.toSet()).stream()
+      .collect(Collectors.toMap(Function.identity(), table -> 
getMinOpenWriteId(txnWriteIds, table)));
+
+    if (!writeIds.isEmpty()) {
+      try {
+        getMS().addWriteIdsToMinHistory(txnId, writeIds);
+      } catch (TException | LockException e) {
+        throw new 
RuntimeException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e);
+      }
+    }
+  }
+
+  private Long getMinOpenWriteId(ValidTxnWriteIdList txnWriteIds, String 
table) {

Review Comment:
   The only usage of this method is above, in situations like this I prefer to 
embed it.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 847813)
    Time Spent: 3h 50m  (was: 3h 40m)

> Cleaner shouldn't be blocked by global min open txnId
> -----------------------------------------------------
>
>                 Key: HIVE-26704
>                 URL: https://issues.apache.org/jira/browse/HIVE-26704
>             Project: Hive
>          Issue Type: Task
>            Reporter: Denys Kuzmenko
>            Assignee: Denys Kuzmenko
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 3h 50m
>  Remaining Estimate: 0h
>
> *Single transaction blocks cluster-wide Cleaner operations*
> Currently, if there is a single long-running transaction that can prevent the 
> Cleaner to clean up any tables. This causes file buildup in tables, which can 
> cause performance penalties when listing the directories (note that the 
> compaction is not blocked by this, so unnecessary data is not read, but the 
> files remain there which causes performance penalty). 
> We can reduce the protected files from the open transaction if we have 
> query-table correlation data stored in the backend DB, but this change will 
> need the current method of recording that detail to be revisited. 
> The naive and somewhat backward-compatible approach is to capture the 
> minOpenWriteIds per table. It involves a non-mutation operation (as in, there 
> is no need for the HMS DB to wait for another user’s operation to record it). 
> This does spew data writes into the HMS backend DB, but this is a blind 
> insert operation that can be group-committed across many users. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to