veghlaci05 commented on code in PR #4313:
URL: https://github.com/apache/hive/pull/4313#discussion_r1214012229


##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java:
##########
@@ -399,7 +449,8 @@ public List<CompactionInfo> findReadyToClean(long 
minOpenTxnWaterMark, long rete
          * By filtering on minOpenTxnWaterMark, we will only cleanup after 
every transaction is committed, that could see
          * the uncompacted deltas. This way the cleaner can clean up 
everything that was made obsolete by this compaction.
          */
-        String whereClause = " WHERE \"CQ_STATE\" = " + 
quoteChar(READY_FOR_CLEANING) + 
+        String whereClause = " WHERE \"CQ_STATE\" = " + 
quoteChar(READY_FOR_CLEANING) +
+          " AND \"CQ_TYPE\" != " + quoteChar(TxnStore.ABORT_CLEANUP_TYPE) +

Review Comment:
   I think this should be in sync with COMPACTOR_CLEAN_ABORTS_USING_CLEANER 
value. if that is set to false this method should not filter out the aborts.



##########
ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestAbortedTxnCleaner.java:
##########
@@ -320,4 +329,397 @@ public void 
testAbortedCleaningWithThreeTxnsWithDiffWriteIds() throws Exception
     List<Path> directories = getDirectories(conf, t, null);
     Assert.assertEquals(5, directories.size());
   }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testAbortCleanupNotUpdatingSpecificCompactionTables(boolean 
isPartitioned) throws Exception {
+    String dbName = "default", tableName = 
"abort_cleanup_not_populating_compaction_tables_test", partName = "today";
+    Table t = newTable(dbName, tableName, isPartitioned);
+    Partition p = isPartitioned ? newPartition(t, partName) : null;
+
+    // 3-aborted deltas & one committed delta
+    addDeltaFileWithTxnComponents(t, p, 2, true);
+    addDeltaFileWithTxnComponents(t, p, 2, true);
+    addDeltaFileWithTxnComponents(t, p, 2, false);
+    addDeltaFileWithTxnComponents(t, p, 2, true);
+
+    MetastoreConf.setBoolVar(conf, 
MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER, true);
+    HiveConf.setIntVar(conf, 
HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 0);
+    MetadataCache metadataCache = new MetadataCache(true);
+    FSRemover mockedFSRemover = Mockito.spy(new FSRemover(conf, 
ReplChangeManager.getInstance(conf), metadataCache));
+    TaskHandler mockedTaskHandler = Mockito.spy(new AbortedTxnCleaner(conf, 
txnHandler, metadataCache,
+            false, mockedFSRemover));
+
+    runInitiator(conf);
+    // Initiator must not add anything to compaction_queue
+    String compactionQueuePresence = "SELECT COUNT(*) FROM 
\"COMPACTION_QUEUE\" " +
+            " WHERE \"CQ_DATABASE\" = '" + dbName+ "' AND \"CQ_TABLE\" = '" + 
tableName + "' AND \"CQ_PARTITION\"" +
+            (isPartitioned ? " = 'ds=" + partName + "'" : " IS NULL");
+    Assert.assertEquals(0, TestTxnDbUtil.countQueryAgent(conf, 
compactionQueuePresence));
+
+    Cleaner cleaner = new Cleaner();
+    cleaner.setConf(conf);
+    cleaner.init(new AtomicBoolean(true));
+    cleaner.setCleanupHandlers(Arrays.asList(mockedTaskHandler));
+    cleaner.run();
+
+    Mockito.verify(mockedFSRemover, 
Mockito.times(1)).clean(any(CleanupRequest.class));

Review Comment:
   Why only one clean call, when there are 3 aborted txns?



##########
ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestAbortedTxnCleaner.java:
##########
@@ -320,4 +329,397 @@ public void 
testAbortedCleaningWithThreeTxnsWithDiffWriteIds() throws Exception
     List<Path> directories = getDirectories(conf, t, null);
     Assert.assertEquals(5, directories.size());
   }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testAbortCleanupNotUpdatingSpecificCompactionTables(boolean 
isPartitioned) throws Exception {
+    String dbName = "default", tableName = 
"abort_cleanup_not_populating_compaction_tables_test", partName = "today";
+    Table t = newTable(dbName, tableName, isPartitioned);
+    Partition p = isPartitioned ? newPartition(t, partName) : null;
+
+    // 3-aborted deltas & one committed delta
+    addDeltaFileWithTxnComponents(t, p, 2, true);
+    addDeltaFileWithTxnComponents(t, p, 2, true);
+    addDeltaFileWithTxnComponents(t, p, 2, false);
+    addDeltaFileWithTxnComponents(t, p, 2, true);
+
+    MetastoreConf.setBoolVar(conf, 
MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER, true);
+    HiveConf.setIntVar(conf, 
HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 0);
+    MetadataCache metadataCache = new MetadataCache(true);
+    FSRemover mockedFSRemover = Mockito.spy(new FSRemover(conf, 
ReplChangeManager.getInstance(conf), metadataCache));
+    TaskHandler mockedTaskHandler = Mockito.spy(new AbortedTxnCleaner(conf, 
txnHandler, metadataCache,
+            false, mockedFSRemover));
+
+    runInitiator(conf);
+    // Initiator must not add anything to compaction_queue
+    String compactionQueuePresence = "SELECT COUNT(*) FROM 
\"COMPACTION_QUEUE\" " +
+            " WHERE \"CQ_DATABASE\" = '" + dbName+ "' AND \"CQ_TABLE\" = '" + 
tableName + "' AND \"CQ_PARTITION\"" +
+            (isPartitioned ? " = 'ds=" + partName + "'" : " IS NULL");
+    Assert.assertEquals(0, TestTxnDbUtil.countQueryAgent(conf, 
compactionQueuePresence));
+
+    Cleaner cleaner = new Cleaner();
+    cleaner.setConf(conf);
+    cleaner.init(new AtomicBoolean(true));
+    cleaner.setCleanupHandlers(Arrays.asList(mockedTaskHandler));
+    cleaner.run();
+
+    Mockito.verify(mockedFSRemover, 
Mockito.times(1)).clean(any(CleanupRequest.class));
+    Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks();
+
+    Assert.assertEquals(0, TestTxnDbUtil.countQueryAgent(conf, 
compactionQueuePresence));
+    Assert.assertEquals(0, TestTxnDbUtil.countQueryAgent(conf, "SELECT 
COUNT(*) FROM \"COMPLETED_COMPACTIONS\" " +
+            " WHERE \"CC_DATABASE\" = '" + dbName+ "' AND \"CC_TABLE\" = '" + 
tableName + "' AND \"CC_PARTITION\"" +
+            (isPartitioned ? " = 'ds=" + partName + "'" : " IS NULL")));
+    Assert.assertEquals(1, TestTxnDbUtil.countQueryAgent(conf, "SELECT 
COUNT(*) FROM \"COMPLETED_TXN_COMPONENTS\" " +
+            " WHERE \"CTC_DATABASE\" = '" + dbName+ "' AND \"CTC_TABLE\" = '" 
+ tableName + "' AND \"CTC_PARTITION\"" +
+            (isPartitioned ? " = 'ds=" + partName + "'" : " IS NULL")));
+
+    List<Path> directories = getDirectories(conf, t, null);
+    // All aborted directories removed, hence 1 committed delta directory must 
be present
+    Assert.assertEquals(1, directories.size());
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testRetryEntryOnFailures(boolean isPartitioned) throws Exception 
{
+    String dbName = "default", tableName = "handler_retry_entry", partName = 
"today";
+    Table t = newTable(dbName, tableName, isPartitioned);
+    Partition p = isPartitioned ? newPartition(t, partName) : null;
+
+    // Add 2 committed deltas and 2 aborted deltas
+    addDeltaFileWithTxnComponents(t, p, 2, false);
+    addDeltaFileWithTxnComponents(t, p, 2, true);
+    addDeltaFileWithTxnComponents(t, p, 2, true);
+    addDeltaFileWithTxnComponents(t, p, 2, false);
+
+    HiveConf.setIntVar(conf, 
HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 0);
+    MetadataCache metadataCache = new MetadataCache(true);
+    FSRemover mockedFSRemover = Mockito.spy(new FSRemover(conf, 
ReplChangeManager.getInstance(conf), metadataCache));
+    TxnStore mockedTxnHandler = Mockito.spy(txnHandler);
+    TaskHandler mockedTaskHandler = Mockito.spy(new AbortedTxnCleaner(conf, 
mockedTxnHandler, metadataCache,
+            false, mockedFSRemover));
+    // Invoke runtime exception when calling markCleaned.
+    Mockito.doAnswer(invocationOnMock -> {
+      throw new RuntimeException("Testing retry");
+    }).when(mockedFSRemover).clean(any());
+
+    Cleaner cleaner = new Cleaner();
+    cleaner.setConf(conf);
+    cleaner.init(new AtomicBoolean(true));
+    cleaner.setCleanupHandlers(Arrays.asList(mockedTaskHandler));
+    cleaner.run();
+
+    Mockito.verify(mockedTxnHandler, 
Mockito.times(1)).setAbortCleanerRetryRetentionTimeOnError(any(AbortTxnRequestInfo.class));
+    String whereClause = " WHERE \"CQ_DATABASE\" = '" + dbName+ "' AND 
\"CQ_TABLE\" = '" + tableName + "' AND \"CQ_PARTITION\"" +
+            (isPartitioned ? " = 'ds=" + partName + "'" : " IS NULL") + " AND 
\"CQ_TYPE\" = 'c' AND \"CQ_STATE\" = 'r'";
+    Assert.assertEquals(1, TestTxnDbUtil.countQueryAgent(conf, "SELECT 
COUNT(*) FROM \"COMPACTION_QUEUE\" " + whereClause));
+    String retryRetentionQuery = "SELECT \"CQ_RETRY_RETENTION\" FROM 
\"COMPACTION_QUEUE\" " + whereClause;

Review Comment:
   Wouldn't be easier and more readable to do the assertion on the showcompact 
output instead of assembling direct SQL queries?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to