veghlaci05 commented on code in PR #4313:
URL: https://github.com/apache/hive/pull/4313#discussion_r1214012229
##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java:
##########
@@ -399,7 +449,8 @@ public List<CompactionInfo> findReadyToClean(long
minOpenTxnWaterMark, long rete
* By filtering on minOpenTxnWaterMark, we will only cleanup after
every transaction is committed, that could see
* the uncompacted deltas. This way the cleaner can clean up
everything that was made obsolete by this compaction.
*/
- String whereClause = " WHERE \"CQ_STATE\" = " +
quoteChar(READY_FOR_CLEANING) +
+ String whereClause = " WHERE \"CQ_STATE\" = " +
quoteChar(READY_FOR_CLEANING) +
+ " AND \"CQ_TYPE\" != " + quoteChar(TxnStore.ABORT_CLEANUP_TYPE) +
Review Comment:
I think this should be in sync with COMPACTOR_CLEAN_ABORTS_USING_CLEANER
value. if that is set to false this method should not filter out the aborts.
##########
ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestAbortedTxnCleaner.java:
##########
@@ -320,4 +329,397 @@ public void
testAbortedCleaningWithThreeTxnsWithDiffWriteIds() throws Exception
List<Path> directories = getDirectories(conf, t, null);
Assert.assertEquals(5, directories.size());
}
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testAbortCleanupNotUpdatingSpecificCompactionTables(boolean
isPartitioned) throws Exception {
+ String dbName = "default", tableName =
"abort_cleanup_not_populating_compaction_tables_test", partName = "today";
+ Table t = newTable(dbName, tableName, isPartitioned);
+ Partition p = isPartitioned ? newPartition(t, partName) : null;
+
+ // 3-aborted deltas & one committed delta
+ addDeltaFileWithTxnComponents(t, p, 2, true);
+ addDeltaFileWithTxnComponents(t, p, 2, true);
+ addDeltaFileWithTxnComponents(t, p, 2, false);
+ addDeltaFileWithTxnComponents(t, p, 2, true);
+
+ MetastoreConf.setBoolVar(conf,
MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER, true);
+ HiveConf.setIntVar(conf,
HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 0);
+ MetadataCache metadataCache = new MetadataCache(true);
+ FSRemover mockedFSRemover = Mockito.spy(new FSRemover(conf,
ReplChangeManager.getInstance(conf), metadataCache));
+ TaskHandler mockedTaskHandler = Mockito.spy(new AbortedTxnCleaner(conf,
txnHandler, metadataCache,
+ false, mockedFSRemover));
+
+ runInitiator(conf);
+ // Initiator must not add anything to compaction_queue
+ String compactionQueuePresence = "SELECT COUNT(*) FROM
\"COMPACTION_QUEUE\" " +
+ " WHERE \"CQ_DATABASE\" = '" + dbName+ "' AND \"CQ_TABLE\" = '" +
tableName + "' AND \"CQ_PARTITION\"" +
+ (isPartitioned ? " = 'ds=" + partName + "'" : " IS NULL");
+ Assert.assertEquals(0, TestTxnDbUtil.countQueryAgent(conf,
compactionQueuePresence));
+
+ Cleaner cleaner = new Cleaner();
+ cleaner.setConf(conf);
+ cleaner.init(new AtomicBoolean(true));
+ cleaner.setCleanupHandlers(Arrays.asList(mockedTaskHandler));
+ cleaner.run();
+
+ Mockito.verify(mockedFSRemover,
Mockito.times(1)).clean(any(CleanupRequest.class));
Review Comment:
Why only one clean call, when there are 3 aborted txns?
##########
ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestAbortedTxnCleaner.java:
##########
@@ -320,4 +329,397 @@ public void
testAbortedCleaningWithThreeTxnsWithDiffWriteIds() throws Exception
List<Path> directories = getDirectories(conf, t, null);
Assert.assertEquals(5, directories.size());
}
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testAbortCleanupNotUpdatingSpecificCompactionTables(boolean
isPartitioned) throws Exception {
+ String dbName = "default", tableName =
"abort_cleanup_not_populating_compaction_tables_test", partName = "today";
+ Table t = newTable(dbName, tableName, isPartitioned);
+ Partition p = isPartitioned ? newPartition(t, partName) : null;
+
+ // 3-aborted deltas & one committed delta
+ addDeltaFileWithTxnComponents(t, p, 2, true);
+ addDeltaFileWithTxnComponents(t, p, 2, true);
+ addDeltaFileWithTxnComponents(t, p, 2, false);
+ addDeltaFileWithTxnComponents(t, p, 2, true);
+
+ MetastoreConf.setBoolVar(conf,
MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER, true);
+ HiveConf.setIntVar(conf,
HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 0);
+ MetadataCache metadataCache = new MetadataCache(true);
+ FSRemover mockedFSRemover = Mockito.spy(new FSRemover(conf,
ReplChangeManager.getInstance(conf), metadataCache));
+ TaskHandler mockedTaskHandler = Mockito.spy(new AbortedTxnCleaner(conf,
txnHandler, metadataCache,
+ false, mockedFSRemover));
+
+ runInitiator(conf);
+ // Initiator must not add anything to compaction_queue
+ String compactionQueuePresence = "SELECT COUNT(*) FROM
\"COMPACTION_QUEUE\" " +
+ " WHERE \"CQ_DATABASE\" = '" + dbName+ "' AND \"CQ_TABLE\" = '" +
tableName + "' AND \"CQ_PARTITION\"" +
+ (isPartitioned ? " = 'ds=" + partName + "'" : " IS NULL");
+ Assert.assertEquals(0, TestTxnDbUtil.countQueryAgent(conf,
compactionQueuePresence));
+
+ Cleaner cleaner = new Cleaner();
+ cleaner.setConf(conf);
+ cleaner.init(new AtomicBoolean(true));
+ cleaner.setCleanupHandlers(Arrays.asList(mockedTaskHandler));
+ cleaner.run();
+
+ Mockito.verify(mockedFSRemover,
Mockito.times(1)).clean(any(CleanupRequest.class));
+ Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks();
+
+ Assert.assertEquals(0, TestTxnDbUtil.countQueryAgent(conf,
compactionQueuePresence));
+ Assert.assertEquals(0, TestTxnDbUtil.countQueryAgent(conf, "SELECT
COUNT(*) FROM \"COMPLETED_COMPACTIONS\" " +
+ " WHERE \"CC_DATABASE\" = '" + dbName+ "' AND \"CC_TABLE\" = '" +
tableName + "' AND \"CC_PARTITION\"" +
+ (isPartitioned ? " = 'ds=" + partName + "'" : " IS NULL")));
+ Assert.assertEquals(1, TestTxnDbUtil.countQueryAgent(conf, "SELECT
COUNT(*) FROM \"COMPLETED_TXN_COMPONENTS\" " +
+ " WHERE \"CTC_DATABASE\" = '" + dbName+ "' AND \"CTC_TABLE\" = '"
+ tableName + "' AND \"CTC_PARTITION\"" +
+ (isPartitioned ? " = 'ds=" + partName + "'" : " IS NULL")));
+
+ List<Path> directories = getDirectories(conf, t, null);
+ // All aborted directories removed, hence 1 committed delta directory must
be present
+ Assert.assertEquals(1, directories.size());
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testRetryEntryOnFailures(boolean isPartitioned) throws Exception
{
+ String dbName = "default", tableName = "handler_retry_entry", partName =
"today";
+ Table t = newTable(dbName, tableName, isPartitioned);
+ Partition p = isPartitioned ? newPartition(t, partName) : null;
+
+ // Add 2 committed deltas and 2 aborted deltas
+ addDeltaFileWithTxnComponents(t, p, 2, false);
+ addDeltaFileWithTxnComponents(t, p, 2, true);
+ addDeltaFileWithTxnComponents(t, p, 2, true);
+ addDeltaFileWithTxnComponents(t, p, 2, false);
+
+ HiveConf.setIntVar(conf,
HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 0);
+ MetadataCache metadataCache = new MetadataCache(true);
+ FSRemover mockedFSRemover = Mockito.spy(new FSRemover(conf,
ReplChangeManager.getInstance(conf), metadataCache));
+ TxnStore mockedTxnHandler = Mockito.spy(txnHandler);
+ TaskHandler mockedTaskHandler = Mockito.spy(new AbortedTxnCleaner(conf,
mockedTxnHandler, metadataCache,
+ false, mockedFSRemover));
+ // Invoke runtime exception when calling markCleaned.
+ Mockito.doAnswer(invocationOnMock -> {
+ throw new RuntimeException("Testing retry");
+ }).when(mockedFSRemover).clean(any());
+
+ Cleaner cleaner = new Cleaner();
+ cleaner.setConf(conf);
+ cleaner.init(new AtomicBoolean(true));
+ cleaner.setCleanupHandlers(Arrays.asList(mockedTaskHandler));
+ cleaner.run();
+
+ Mockito.verify(mockedTxnHandler,
Mockito.times(1)).setAbortCleanerRetryRetentionTimeOnError(any(AbortTxnRequestInfo.class));
+ String whereClause = " WHERE \"CQ_DATABASE\" = '" + dbName+ "' AND
\"CQ_TABLE\" = '" + tableName + "' AND \"CQ_PARTITION\"" +
+ (isPartitioned ? " = 'ds=" + partName + "'" : " IS NULL") + " AND
\"CQ_TYPE\" = 'c' AND \"CQ_STATE\" = 'r'";
+ Assert.assertEquals(1, TestTxnDbUtil.countQueryAgent(conf, "SELECT
COUNT(*) FROM \"COMPACTION_QUEUE\" " + whereClause));
+ String retryRetentionQuery = "SELECT \"CQ_RETRY_RETENTION\" FROM
\"COMPACTION_QUEUE\" " + whereClause;
Review Comment:
Wouldn't be easier and more readable to do the assertion on the showcompact
output instead of assembling direct SQL queries?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]