SourabhBadhya commented on code in PR #4313:
URL: https://github.com/apache/hive/pull/4313#discussion_r1219106647


##########
ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestAbortedTxnCleaner.java:
##########
@@ -320,4 +329,397 @@ public void 
testAbortedCleaningWithThreeTxnsWithDiffWriteIds() throws Exception
     List<Path> directories = getDirectories(conf, t, null);
     Assert.assertEquals(5, directories.size());
   }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testAbortCleanupNotUpdatingSpecificCompactionTables(boolean 
isPartitioned) throws Exception {
+    String dbName = "default", tableName = 
"abort_cleanup_not_populating_compaction_tables_test", partName = "today";
+    Table t = newTable(dbName, tableName, isPartitioned);
+    Partition p = isPartitioned ? newPartition(t, partName) : null;
+
+    // 3-aborted deltas & one committed delta
+    addDeltaFileWithTxnComponents(t, p, 2, true);
+    addDeltaFileWithTxnComponents(t, p, 2, true);
+    addDeltaFileWithTxnComponents(t, p, 2, false);
+    addDeltaFileWithTxnComponents(t, p, 2, true);
+
+    MetastoreConf.setBoolVar(conf, 
MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER, true);
+    HiveConf.setIntVar(conf, 
HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 0);
+    MetadataCache metadataCache = new MetadataCache(true);
+    FSRemover mockedFSRemover = Mockito.spy(new FSRemover(conf, 
ReplChangeManager.getInstance(conf), metadataCache));
+    TaskHandler mockedTaskHandler = Mockito.spy(new AbortedTxnCleaner(conf, 
txnHandler, metadataCache,
+            false, mockedFSRemover));
+
+    runInitiator(conf);
+    // Initiator must not add anything to compaction_queue
+    String compactionQueuePresence = "SELECT COUNT(*) FROM 
\"COMPACTION_QUEUE\" " +
+            " WHERE \"CQ_DATABASE\" = '" + dbName+ "' AND \"CQ_TABLE\" = '" + 
tableName + "' AND \"CQ_PARTITION\"" +
+            (isPartitioned ? " = 'ds=" + partName + "'" : " IS NULL");
+    Assert.assertEquals(0, TestTxnDbUtil.countQueryAgent(conf, 
compactionQueuePresence));
+
+    Cleaner cleaner = new Cleaner();
+    cleaner.setConf(conf);
+    cleaner.init(new AtomicBoolean(true));
+    cleaner.setCleanupHandlers(Arrays.asList(mockedTaskHandler));
+    cleaner.run();
+
+    Mockito.verify(mockedFSRemover, 
Mockito.times(1)).clean(any(CleanupRequest.class));
+    Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks();
+
+    Assert.assertEquals(0, TestTxnDbUtil.countQueryAgent(conf, 
compactionQueuePresence));
+    Assert.assertEquals(0, TestTxnDbUtil.countQueryAgent(conf, "SELECT 
COUNT(*) FROM \"COMPLETED_COMPACTIONS\" " +
+            " WHERE \"CC_DATABASE\" = '" + dbName+ "' AND \"CC_TABLE\" = '" + 
tableName + "' AND \"CC_PARTITION\"" +
+            (isPartitioned ? " = 'ds=" + partName + "'" : " IS NULL")));
+    Assert.assertEquals(1, TestTxnDbUtil.countQueryAgent(conf, "SELECT 
COUNT(*) FROM \"COMPLETED_TXN_COMPONENTS\" " +
+            " WHERE \"CTC_DATABASE\" = '" + dbName+ "' AND \"CTC_TABLE\" = '" 
+ tableName + "' AND \"CTC_PARTITION\"" +
+            (isPartitioned ? " = 'ds=" + partName + "'" : " IS NULL")));
+
+    List<Path> directories = getDirectories(conf, t, null);
+    // All aborted directories removed, hence 1 committed delta directory must 
be present
+    Assert.assertEquals(1, directories.size());
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testRetryEntryOnFailures(boolean isPartitioned) throws Exception 
{
+    String dbName = "default", tableName = "handler_retry_entry", partName = 
"today";
+    Table t = newTable(dbName, tableName, isPartitioned);
+    Partition p = isPartitioned ? newPartition(t, partName) : null;
+
+    // Add 2 committed deltas and 2 aborted deltas
+    addDeltaFileWithTxnComponents(t, p, 2, false);
+    addDeltaFileWithTxnComponents(t, p, 2, true);
+    addDeltaFileWithTxnComponents(t, p, 2, true);
+    addDeltaFileWithTxnComponents(t, p, 2, false);
+
+    HiveConf.setIntVar(conf, 
HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 0);
+    MetadataCache metadataCache = new MetadataCache(true);
+    FSRemover mockedFSRemover = Mockito.spy(new FSRemover(conf, 
ReplChangeManager.getInstance(conf), metadataCache));
+    TxnStore mockedTxnHandler = Mockito.spy(txnHandler);
+    TaskHandler mockedTaskHandler = Mockito.spy(new AbortedTxnCleaner(conf, 
mockedTxnHandler, metadataCache,
+            false, mockedFSRemover));
+    // Invoke runtime exception when calling markCleaned.
+    Mockito.doAnswer(invocationOnMock -> {
+      throw new RuntimeException("Testing retry");
+    }).when(mockedFSRemover).clean(any());
+
+    Cleaner cleaner = new Cleaner();
+    cleaner.setConf(conf);
+    cleaner.init(new AtomicBoolean(true));
+    cleaner.setCleanupHandlers(Arrays.asList(mockedTaskHandler));
+    cleaner.run();
+
+    Mockito.verify(mockedTxnHandler, 
Mockito.times(1)).setAbortCleanerRetryRetentionTimeOnError(any(AbortTxnRequestInfo.class));
+    String whereClause = " WHERE \"CQ_DATABASE\" = '" + dbName+ "' AND 
\"CQ_TABLE\" = '" + tableName + "' AND \"CQ_PARTITION\"" +
+            (isPartitioned ? " = 'ds=" + partName + "'" : " IS NULL") + " AND 
\"CQ_TYPE\" = 'c' AND \"CQ_STATE\" = 'r'";
+    Assert.assertEquals(1, TestTxnDbUtil.countQueryAgent(conf, "SELECT 
COUNT(*) FROM \"COMPACTION_QUEUE\" " + whereClause));
+    String retryRetentionQuery = "SELECT \"CQ_RETRY_RETENTION\" FROM 
\"COMPACTION_QUEUE\" " + whereClause;

Review Comment:
   Used showCompact output. Done.



##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/AbortedTxnCleaner.java:
##########
@@ -118,10 +120,16 @@ private void clean(CompactionInfo info, long 
minOpenWriteTxn, boolean metricsEna
       abortCleanUsingAcidDir(info, location, minOpenWriteTxn);
 
     } catch (InterruptedException e) {
+      LOG.error("Caught an interrupted exception when cleaning, unable to 
complete cleaning of {} due to {}", info,
+              e.getMessage());
+      info.errorMessage = e.getMessage();
+      handleCleanerAttemptFailure(info);
       throw e;
     } catch (Exception e) {
       LOG.error("Caught exception when cleaning, unable to complete cleaning 
of {} due to {}", info,
               e.getMessage());
+      info.errorMessage = e.getMessage();
+      handleCleanerAttemptFailure(info);

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to