This is an automated email from the ASF dual-hosted git repository.

asinkovits pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 4566f07  HIVE-25716: Fix of flaky test 
TestCompactionMetrics#testOldestReadyForCleaningAge (Viktor Csomor, reviewed by 
Denys Kuzmenko and Antal Sinkovits)
4566f07 is described below

commit 4566f076a2b3d6d87b258570a373dd60f7152786
Author: Viktor Csomor <[email protected]>
AuthorDate: Thu Dec 9 11:15:45 2021 +0100

    HIVE-25716: Fix of flaky test 
TestCompactionMetrics#testOldestReadyForCleaningAge (Viktor Csomor, reviewed by 
Denys Kuzmenko and Antal Sinkovits)
    
    Closes #2837
---
 .../hive/ql/txn/compactor/CompactorTest.java       | 10 ++--
 .../ql/txn/compactor/TestCompactionMetrics.java    | 60 ++++++++++++++--------
 2 files changed, 44 insertions(+), 26 deletions(-)

diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
index 5dc01f9..c2b78ad 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
@@ -292,7 +292,8 @@ public abstract class CompactorTest {
     burnThroughTransactions(dbName, tblName, num, open, aborted, null);
   }
 
-  protected void burnThroughTransactions(String dbName, String tblName, int 
num, Set<Long> open, Set<Long> aborted, LockRequest lockReq)
+  protected void burnThroughTransactions(String dbName, String tblName, int 
num, Set<Long> open, Set<Long> aborted,
+      LockRequest lockReq)
       throws MetaException, NoSuchTxnException, TxnAbortedException {
     OpenTxnsResponse rsp = txnHandler.openTxns(new OpenTxnRequest(num, "me", 
"localhost"));
     AllocateTableWriteIdsRequest awiRqst = new 
AllocateTableWriteIdsRequest(dbName, tblName);
@@ -300,14 +301,15 @@ public abstract class CompactorTest {
     AllocateTableWriteIdsResponse awiResp = 
txnHandler.allocateTableWriteIds(awiRqst);
     int i = 0;
     for (long tid : rsp.getTxn_ids()) {
-      assert(awiResp.getTxnToWriteIds().get(i++).getTxnId() == tid);
-      if(lockReq != null) {
+      assert (awiResp.getTxnToWriteIds().get(i).getTxnId() == tid);
+      ++i;
+      if (lockReq != null) {
         lockReq.setTxnid(tid);
         txnHandler.lock(lockReq);
       }
       if (aborted != null && aborted.contains(tid)) {
         txnHandler.abortTxn(new AbortTxnRequest(tid));
-      } else if (open == null || (open != null && !open.contains(tid))) {
+      } else if (open == null || !open.contains(tid)) {
         txnHandler.commitTxn(new CommitTxnRequest(tid));
       }
     }
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java
 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java
index 75c722b..eda09c1 100644
--- 
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java
+++ 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java
@@ -179,41 +179,49 @@ public class TestCompactionMetrics  extends CompactorTest 
{
   }
 
   @Test
-  @org.junit.Ignore("HIVE-25716")
   public void testOldestReadyForCleaningAge() throws Exception {
     conf.setIntVar(HiveConf.ConfVars.COMPACTOR_MAX_NUM_DELTA, 1);
 
-    long oldStart = System.currentTimeMillis();
-    Table old = newTable("default", "old_rfc", true);
-    Partition oldP = newPartition(old, "part");
+    final String DB_NAME = "default";
+    final String OLD_TABLE_NAME = "old_rfc";
+    final String OLD_PARTITION_NAME = "part";
+    final String YOUNG_TABLE_NAME = "young_rfc";
+    final String YOUNG_PARTITION_NAME = "part";
+
+    long oldTableStart = System.currentTimeMillis();
+    Table old = newTable(DB_NAME, OLD_TABLE_NAME, true);
+    Partition oldP = newPartition(old, OLD_PARTITION_NAME);
     addBaseFile(old, oldP, 20L, 20);
     addDeltaFile(old, oldP, 21L, 22L, 2);
     addDeltaFile(old, oldP, 23L, 24L, 2);
-    burnThroughTransactions("default", "old_rfc", 25);
-    CompactionRequest rqst = new CompactionRequest("default", "old_rfc", 
CompactionType.MINOR);
-    rqst.setPartitionname("ds=part");
-    txnHandler.compact(rqst);
-    startWorker();
+    burnThroughTransactions(DB_NAME, OLD_TABLE_NAME, 25);
+    doCompaction(DB_NAME, OLD_TABLE_NAME, OLD_PARTITION_NAME, 
CompactionType.MINOR);
+    long oldTableEnd = System.currentTimeMillis();
 
-    long youngStart = System.currentTimeMillis();
-    Table young = newTable("default", "young_rfc", true);
-    Partition youngP = newPartition(young, "part");
+    Table young = newTable(DB_NAME, YOUNG_TABLE_NAME, true);
+    Partition youngP = newPartition(young, YOUNG_PARTITION_NAME);
     addBaseFile(young, youngP, 20L, 20);
     addDeltaFile(young, youngP, 21L, 22L, 2);
     addDeltaFile(young, youngP, 23L, 24L, 2);
-    burnThroughTransactions("default", "young_rfc", 25);
-    rqst = new CompactionRequest("default", "young_rfc", CompactionType.MINOR);
-    rqst.setPartitionname("ds=part");
-    txnHandler.compact(rqst);
-    startWorker();
+    burnThroughTransactions(DB_NAME, YOUNG_TABLE_NAME, 25);
+    doCompaction(DB_NAME, YOUNG_TABLE_NAME, YOUNG_PARTITION_NAME, 
CompactionType.MINOR);
 
+    long acidMetricsStart = System.currentTimeMillis();
     runAcidMetricService();
-    long oldDiff = (System.currentTimeMillis() - oldStart)/1000;
-    long youngDiff = (System.currentTimeMillis() - youngStart)/1000;
+    long now = System.currentTimeMillis();
+    long acidMetricsDuration = now - acidMetricsStart;
+
+    int oldestAgeInSeconds = 
Metrics.getOrCreateGauge(MetricsConstants.OLDEST_READY_FOR_CLEANING_AGE)
+        .intValue();
 
-    long threshold = 1000;
-    
Assert.assertTrue(Metrics.getOrCreateGauge(MetricsConstants.OLDEST_READY_FOR_CLEANING_AGE).intValue()
 <= oldDiff + threshold);
-    
Assert.assertTrue(Metrics.getOrCreateGauge(MetricsConstants.OLDEST_READY_FOR_CLEANING_AGE).intValue()
 >= youngDiff);
+    long ageInMillies = oldestAgeInSeconds * 1000L;
+
+    long DB_ROUNDING_DOWN_ERROR = 1000L;
+    long readError = acidMetricsDuration + DB_ROUNDING_DOWN_ERROR;
+
+    long oldStartShiftedToNow = oldTableStart + ageInMillies;
+    long oldEndShiftedToNow = (oldTableEnd + ageInMillies) + readError;
+    Assert.assertTrue((oldStartShiftedToNow < now) && (now < 
oldEndShiftedToNow));
   }
 
   @Test
@@ -900,6 +908,14 @@ public class TestCompactionMetrics  extends CompactorTest {
     t.start();
   }
 
+  private void doCompaction(String dbName, String tableName, String 
partitionName, CompactionType type)
+      throws Exception {
+    CompactionRequest rqst = new CompactionRequest(dbName, tableName, type);
+    rqst.setPartitionname("ds=" + partitionName);
+    txnHandler.compact(rqst);
+    startWorker();
+  }
+
   @Override
   boolean useHive130DeltaDirName() {
     return false;

Reply via email to