This is an automated email from the ASF dual-hosted git repository.
asinkovits pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 4566f07 HIVE-25716: Fix of flaky test
TestCompactionMetrics#testOldestReadyForCleaningAge (Viktor Csomor, reviewed by
Denys Kuzmenko and Antal Sinkovits)
4566f07 is described below
commit 4566f076a2b3d6d87b258570a373dd60f7152786
Author: Viktor Csomor <[email protected]>
AuthorDate: Thu Dec 9 11:15:45 2021 +0100
HIVE-25716: Fix of flaky test
TestCompactionMetrics#testOldestReadyForCleaningAge (Viktor Csomor, reviewed by
Denys Kuzmenko and Antal Sinkovits)
Closes #2837
---
.../hive/ql/txn/compactor/CompactorTest.java | 10 ++--
.../ql/txn/compactor/TestCompactionMetrics.java | 60 ++++++++++++++--------
2 files changed, 44 insertions(+), 26 deletions(-)
diff --git
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
index 5dc01f9..c2b78ad 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
@@ -292,7 +292,8 @@ public abstract class CompactorTest {
burnThroughTransactions(dbName, tblName, num, open, aborted, null);
}
- protected void burnThroughTransactions(String dbName, String tblName, int
num, Set<Long> open, Set<Long> aborted, LockRequest lockReq)
+ protected void burnThroughTransactions(String dbName, String tblName, int
num, Set<Long> open, Set<Long> aborted,
+ LockRequest lockReq)
throws MetaException, NoSuchTxnException, TxnAbortedException {
OpenTxnsResponse rsp = txnHandler.openTxns(new OpenTxnRequest(num, "me",
"localhost"));
AllocateTableWriteIdsRequest awiRqst = new
AllocateTableWriteIdsRequest(dbName, tblName);
@@ -300,14 +301,15 @@ public abstract class CompactorTest {
AllocateTableWriteIdsResponse awiResp =
txnHandler.allocateTableWriteIds(awiRqst);
int i = 0;
for (long tid : rsp.getTxn_ids()) {
- assert(awiResp.getTxnToWriteIds().get(i++).getTxnId() == tid);
- if(lockReq != null) {
+ assert (awiResp.getTxnToWriteIds().get(i).getTxnId() == tid);
+ ++i;
+ if (lockReq != null) {
lockReq.setTxnid(tid);
txnHandler.lock(lockReq);
}
if (aborted != null && aborted.contains(tid)) {
txnHandler.abortTxn(new AbortTxnRequest(tid));
- } else if (open == null || (open != null && !open.contains(tid))) {
+ } else if (open == null || !open.contains(tid)) {
txnHandler.commitTxn(new CommitTxnRequest(tid));
}
}
diff --git
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java
index 75c722b..eda09c1 100644
---
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java
+++
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java
@@ -179,41 +179,49 @@ public class TestCompactionMetrics extends CompactorTest
{
}
@Test
- @org.junit.Ignore("HIVE-25716")
public void testOldestReadyForCleaningAge() throws Exception {
conf.setIntVar(HiveConf.ConfVars.COMPACTOR_MAX_NUM_DELTA, 1);
- long oldStart = System.currentTimeMillis();
- Table old = newTable("default", "old_rfc", true);
- Partition oldP = newPartition(old, "part");
+ final String DB_NAME = "default";
+ final String OLD_TABLE_NAME = "old_rfc";
+ final String OLD_PARTITION_NAME = "part";
+ final String YOUNG_TABLE_NAME = "young_rfc";
+ final String YOUNG_PARTITION_NAME = "part";
+
+ long oldTableStart = System.currentTimeMillis();
+ Table old = newTable(DB_NAME, OLD_TABLE_NAME, true);
+ Partition oldP = newPartition(old, OLD_PARTITION_NAME);
addBaseFile(old, oldP, 20L, 20);
addDeltaFile(old, oldP, 21L, 22L, 2);
addDeltaFile(old, oldP, 23L, 24L, 2);
- burnThroughTransactions("default", "old_rfc", 25);
- CompactionRequest rqst = new CompactionRequest("default", "old_rfc",
CompactionType.MINOR);
- rqst.setPartitionname("ds=part");
- txnHandler.compact(rqst);
- startWorker();
+ burnThroughTransactions(DB_NAME, OLD_TABLE_NAME, 25);
+ doCompaction(DB_NAME, OLD_TABLE_NAME, OLD_PARTITION_NAME,
CompactionType.MINOR);
+ long oldTableEnd = System.currentTimeMillis();
- long youngStart = System.currentTimeMillis();
- Table young = newTable("default", "young_rfc", true);
- Partition youngP = newPartition(young, "part");
+ Table young = newTable(DB_NAME, YOUNG_TABLE_NAME, true);
+ Partition youngP = newPartition(young, YOUNG_PARTITION_NAME);
addBaseFile(young, youngP, 20L, 20);
addDeltaFile(young, youngP, 21L, 22L, 2);
addDeltaFile(young, youngP, 23L, 24L, 2);
- burnThroughTransactions("default", "young_rfc", 25);
- rqst = new CompactionRequest("default", "young_rfc", CompactionType.MINOR);
- rqst.setPartitionname("ds=part");
- txnHandler.compact(rqst);
- startWorker();
+ burnThroughTransactions(DB_NAME, YOUNG_TABLE_NAME, 25);
+ doCompaction(DB_NAME, YOUNG_TABLE_NAME, YOUNG_PARTITION_NAME,
CompactionType.MINOR);
+ long acidMetricsStart = System.currentTimeMillis();
runAcidMetricService();
- long oldDiff = (System.currentTimeMillis() - oldStart)/1000;
- long youngDiff = (System.currentTimeMillis() - youngStart)/1000;
+ long now = System.currentTimeMillis();
+ long acidMetricsDuration = now - acidMetricsStart;
+
+ int oldestAgeInSeconds =
Metrics.getOrCreateGauge(MetricsConstants.OLDEST_READY_FOR_CLEANING_AGE)
+ .intValue();
- long threshold = 1000;
-
Assert.assertTrue(Metrics.getOrCreateGauge(MetricsConstants.OLDEST_READY_FOR_CLEANING_AGE).intValue()
<= oldDiff + threshold);
-
Assert.assertTrue(Metrics.getOrCreateGauge(MetricsConstants.OLDEST_READY_FOR_CLEANING_AGE).intValue()
>= youngDiff);
+ long ageInMillies = oldestAgeInSeconds * 1000L;
+
+ long DB_ROUNDING_DOWN_ERROR = 1000L;
+ long readError = acidMetricsDuration + DB_ROUNDING_DOWN_ERROR;
+
+ long oldStartShiftedToNow = oldTableStart + ageInMillies;
+ long oldEndShiftedToNow = (oldTableEnd + ageInMillies) + readError;
+ Assert.assertTrue((oldStartShiftedToNow < now) && (now <
oldEndShiftedToNow));
}
@Test
@@ -900,6 +908,14 @@ public class TestCompactionMetrics extends CompactorTest {
t.start();
}
+ private void doCompaction(String dbName, String tableName, String
partitionName, CompactionType type)
+ throws Exception {
+ CompactionRequest rqst = new CompactionRequest(dbName, tableName, type);
+ rqst.setPartitionname("ds=" + partitionName);
+ txnHandler.compact(rqst);
+ startWorker();
+ }
+
@Override
boolean useHive130DeltaDirName() {
return false;