This is an automated email from the ASF dual-hosted git repository.

kuczoram pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 83d98f42fc7 HIVE-29571: ACID Compaction: Marking the compaction as 
compacted should happen after its txn got committed (#6511)
83d98f42fc7 is described below

commit 83d98f42fc7374478e31a3e389be5981e7d489d2
Author: Marta Kuczora <[email protected]>
AuthorDate: Fri May 29 09:57:56 2026 +0200

    HIVE-29571: ACID Compaction: Marking the compaction as compacted should 
happen after its txn got committed (#6511)
---
 .../compactor/service/AcidCompactionService.java   |  49 +-
 .../hadoop/hive/ql/txn/compactor/TestWorker.java   | 529 ++++++++++++++-------
 2 files changed, 387 insertions(+), 191 deletions(-)

diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java
index d1c7e3972d0..cf6e4c02e4c 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java
@@ -48,6 +48,7 @@
 import org.apache.hadoop.hive.ql.txn.compactor.CompactorFactory;
 import org.apache.hadoop.hive.ql.txn.compactor.CompactorPipeline;
 import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil;
+import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil.ThrowingRunnable;
 import org.apache.hadoop.hive.ql.txn.compactor.QueryCompactor;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hive.common.util.Ref;
@@ -211,32 +212,27 @@ public Boolean compact(Table table, CompactionInfo ci) 
throws Exception {
 
       // Don't start compaction or cleaning if not necessary
       if (isDynPartAbort(table, ci)) {
-        msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci));
-        compactionTxn.wasSuccessful();
+        compactionTxn.markForCommit(() -> 
msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci)));
         return false;
       }
       dir = getAcidStateForWorker(ci, sd, tblValidWriteIds);
       if (!isEnoughToCompact(ci, dir, sd)) {
         if (needsCleaning(dir, sd)) {
-          msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci));
+          compactionTxn.markForCommit(() -> 
msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci)));
         } else {
           // do nothing
           ci.errorMessage = "None of the compaction thresholds met, compaction 
request is refused!";
           LOG.debug(ci.errorMessage + " Compaction info: {}", ci);
-          msc.markRefused(CompactionInfo.compactionInfoToStruct(ci));
+          compactionTxn.markForCommit(() -> 
msc.markRefused(CompactionInfo.compactionInfoToStruct(ci)));
+
         }
-        compactionTxn.wasSuccessful();
         return false;
       }
       if (!ci.isMajorCompaction() && 
!CompactorUtil.isMinorCompactionSupported(conf, table.getParameters(), dir)) {
         ci.errorMessage = "Query based Minor compaction is not possible for 
full acid tables having raw format " +
             "(non-acid) data in them.";
         LOG.error(ci.errorMessage + " Compaction info: {}", ci);
-        try {
-          msc.markRefused(CompactionInfo.compactionInfoToStruct(ci));
-        } catch (Throwable tr) {
-          LOG.error("Caught an exception while trying to mark compaction {} as 
failed: {}", ci, tr);
-        }
+        compactionTxn.markForAbort(() -> 
msc.markRefused(CompactionInfo.compactionInfoToStruct(ci)));
         return false;
       }
       CompactorUtil.checkInterrupt(CLASS_NAME);
@@ -261,8 +257,7 @@ public Boolean compact(Table table, CompactionInfo ci) 
throws Exception {
 
         LOG.info("Completed " + ci.type.toString() + " compaction for " + 
ci.getFullPartitionName() + " in "
             + compactionTxn + ", marking as compacted.");
-        msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci));
-        compactionTxn.wasSuccessful();
+        compactionTxn.markForCommit(() -> 
msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci)));
 
         AcidMetricService.updateMetricsFromWorker(ci.dbname, ci.tableName, 
ci.partName, ci.type,
             dir.getCurrentDirectories().size(), dir.getDeleteDeltas().size(), 
conf, msc);
@@ -346,7 +341,11 @@ class CompactionTxn implements AutoCloseable {
     private long lockId = 0;
 
     private TxnStatus status = TxnStatus.UNKNOWN;
-    private boolean successfulCompaction = false;
+
+    private ThrowingRunnable onCommitSuccess;
+    private ThrowingRunnable onAbortSuccess;
+
+    private boolean rollbackOnly = true;
 
     /**
      * Try to open a new txn.
@@ -377,11 +376,13 @@ private LockRequest createLockRequest(CompactionInfo ci) {
       return CompactorUtil.createLockRequest(conf, ci, txnId, 
lockAndOpType.getKey(), lockAndOpType.getValue());
     }
 
-    /**
-     * Mark compaction as successful. This means the txn will be committed; 
otherwise it will be aborted.
-     */
-    void wasSuccessful() {
-      this.successfulCompaction = true;
+    void markForCommit(ThrowingRunnable action) {
+      this.rollbackOnly = false;
+      this.onCommitSuccess = action;
+    }
+
+    void markForAbort(ThrowingRunnable action) {
+      this.onAbortSuccess = action;
     }
 
     /**
@@ -396,10 +397,16 @@ public void close() throws Exception {
         //the transaction is about to close, we can stop heartbeating 
regardless of it's state
         CompactionHeartbeatService.getInstance(conf).stopHeartbeat(txnId);
       } finally {
-        if (successfulCompaction) {
-          commit();
-        } else {
+        if (rollbackOnly) {
           abort();
+          if (onAbortSuccess != null) {
+            onAbortSuccess.run();
+          }
+          return;
+        }
+        commit();
+        if (onCommitSuccess != null) {
+          onCommitSuccess.run();
         }
       }
     }
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
index bf01034711e..f97237353f7 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
@@ -26,6 +26,7 @@
 import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.TransactionalValidationListener;
+import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
 import org.apache.hadoop.hive.metastore.api.CompactionRequest;
 import org.apache.hadoop.hive.metastore.api.CompactionType;
 import org.apache.hadoop.hive.metastore.api.FindNextCompactRequest;
@@ -41,7 +42,10 @@
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.metastore.utils.StringableMap;
+import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
@@ -70,8 +74,13 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.hadoop.hive.common.AcidConstants.VISIBILITY_PATTERN;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -100,9 +109,9 @@ public void stringableMap() throws Exception {
     // Empty map case
     StringableMap m = new StringableMap(new HashMap<String, String>());
     String s = m.toString();
-    Assert.assertEquals("0:", s);
+    assertEquals("0:", s);
     m = new StringableMap(s);
-    Assert.assertEquals(0, m.size());
+    assertEquals(0, m.size());
 
     Map<String, String> base = new HashMap<String, String>();
     base.put("mary", "poppins");
@@ -111,22 +120,22 @@ public void stringableMap() throws Exception {
     m = new StringableMap(base);
     s = m.toString();
     m = new StringableMap(s);
-    Assert.assertEquals(3, m.size());
+    assertEquals(3, m.size());
     Map<String, Boolean> saw = new HashMap<String, Boolean>(3);
     saw.put("mary", false);
     saw.put("bert", false);
     saw.put(null, false);
     for (Map.Entry<String, String> e : m.entrySet()) {
       saw.put(e.getKey(), true);
-      if ("mary".equals(e.getKey())) Assert.assertEquals("poppins", 
e.getValue());
+      if ("mary".equals(e.getKey())) assertEquals("poppins", e.getValue());
       else if ("bert".equals(e.getKey())) Assert.assertNull(e.getValue());
-      else if (null == e.getKey()) Assert.assertEquals("banks", e.getValue());
+      else if (null == e.getKey()) assertEquals("banks", e.getValue());
       else Assert.fail("Unexpected value " + e.getKey());
     }
-    Assert.assertEquals(3, saw.size());
-    Assert.assertTrue(saw.get("mary"));
-    Assert.assertTrue(saw.get("bert"));
-    Assert.assertTrue(saw.get(null));
+    assertEquals(3, saw.size());
+    assertTrue(saw.get("mary"));
+    assertTrue(saw.get("bert"));
+    assertTrue(saw.get(null));
    }
 
   @Test
@@ -134,26 +143,26 @@ public void stringableList() throws Exception {
     // Empty list case
     MRCompactor.StringableList ls = new MRCompactor.StringableList();
     String s = ls.toString();
-    Assert.assertEquals("0:", s);
+    assertEquals("0:", s);
     ls = new MRCompactor.StringableList(s);
-    Assert.assertEquals(0, ls.size());
+    assertEquals(0, ls.size());
 
     ls = new MRCompactor.StringableList();
     ls.add(new Path("/tmp"));
     ls.add(new Path("/usr"));
     s = ls.toString();
-    Assert.assertTrue("Expected 2:4:/tmp4:/usr or 2:4:/usr4:/tmp, got " + s,
+    assertTrue("Expected 2:4:/tmp4:/usr or 2:4:/usr4:/tmp, got " + s,
         "2:4:/tmp4:/usr".equals(s) || "2:4:/usr4:/tmp".equals(s));
     ls = new MRCompactor.StringableList(s);
-    Assert.assertEquals(2, ls.size());
+    assertEquals(2, ls.size());
     boolean sawTmp = false, sawUsr = false;
     for (Path p : ls) {
       if ("/tmp".equals(p.toString())) sawTmp = true;
       else if ("/usr".equals(p.toString())) sawUsr = true;
       else Assert.fail("Unexpected path " + p.toString());
     }
-    Assert.assertTrue(sawTmp);
-    Assert.assertTrue(sawUsr);
+    assertTrue(sawTmp);
+    assertTrue(sawUsr);
   }
 
   @Test
@@ -181,10 +190,10 @@ public void inputSplit() throws Exception {
     MRCompactor.CompactorInputSplit split =
         new MRCompactor.CompactorInputSplit(conf, 3, files, new 
Path(basename), deltas, new HashMap<String, Integer>());
 
-    Assert.assertEquals(520L, split.getLength());
+    assertEquals(520L, split.getLength());
     String[] locations = split.getLocations();
-    Assert.assertEquals(1, locations.length);
-    Assert.assertEquals("localhost", locations[0]);
+    assertEquals(1, locations.length);
+    assertEquals("localhost", locations[0]);
 
     ByteArrayOutputStream buf = new ByteArrayOutputStream();
     DataOutput out = new DataOutputStream(buf);
@@ -194,12 +203,12 @@ public void inputSplit() throws Exception {
     DataInput in = new DataInputStream(new 
ByteArrayInputStream(buf.toByteArray()));
     split.readFields(in);
 
-    Assert.assertEquals(3, split.getBucket());
-    Assert.assertEquals(basename, split.getBaseDir().toString());
+    assertEquals(3, split.getBucket());
+    assertEquals(basename, split.getBaseDir().toString());
     deltas = split.getDeltaDirs();
-    Assert.assertEquals(2, deltas.length);
-    Assert.assertEquals(delta1, deltas[0].toString());
-    Assert.assertEquals(delta2, deltas[1].toString());
+    assertEquals(2, deltas.length);
+    assertEquals(delta1, deltas[0].toString());
+    assertEquals(delta2, deltas[1].toString());
   }
 
   @Test
@@ -234,12 +243,12 @@ public void inputSplitNullBase() throws Exception {
     DataInput in = new DataInputStream(new 
ByteArrayInputStream(buf.toByteArray()));
     split.readFields(in);
 
-    Assert.assertEquals(3, split.getBucket());
+    assertEquals(3, split.getBucket());
     Assert.assertNull(split.getBaseDir());
     deltas = split.getDeltaDirs();
-    Assert.assertEquals(2, deltas.length);
-    Assert.assertEquals(delta1, deltas[0].toString());
-    Assert.assertEquals(delta2, deltas[1].toString());
+    assertEquals(2, deltas.length);
+    assertEquals(delta1, deltas[0].toString());
+    assertEquals(delta2, deltas[1].toString());
   }
 
   @Test
@@ -264,7 +273,7 @@ public void sortedTable() throws Exception {
     // There should still be four directories in the location.
     FileSystem fs = FileSystem.get(conf);
     FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
-    Assert.assertEquals(4, stat.length);
+    assertEquals(4, stat.length);
   }
 
   @Test
@@ -291,7 +300,7 @@ public void sortedPartition() throws Exception {
     // There should still be four directories in the location.
     FileSystem fs = FileSystem.get(conf);
     FileStatus[] stat = fs.listStatus(new Path(p.getSd().getLocation()));
-    Assert.assertEquals(4, stat.length);
+    assertEquals(4, stat.length);
   }
 
   @Test
@@ -312,13 +321,13 @@ public void minorTableWithBase() throws Exception {
 
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     List<ShowCompactResponseElement> compacts = rsp.getCompacts();
-    Assert.assertEquals(1, compacts.size());
-    Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+    assertEquals(1, compacts.size());
+    assertEquals("ready for cleaning", compacts.get(0).getState());
 
     // There should still now be 5 directories in the location
     FileSystem fs = FileSystem.get(conf);
     FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
-    Assert.assertEquals(5, stat.length);
+    assertEquals(5, stat.length);
 
     // Find the new delta file and make sure it has the right contents
     boolean sawNewDelta = false;
@@ -326,26 +335,26 @@ public void minorTableWithBase() throws Exception {
       if (stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(21, 24) 
+ "_v0000026")) {
         sawNewDelta = true;
         FileStatus[] buckets = fs.listStatus(stat[i].getPath(), 
FileUtils.HIDDEN_FILES_PATH_FILTER);
-        Assert.assertEquals(2, buckets.length);
-        
Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
-        
Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
-        Assert.assertEquals(104L, buckets[0].getLen());
-        Assert.assertEquals(104L, buckets[1].getLen());
+        assertEquals(2, buckets.length);
+        assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+        assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+        assertEquals(104L, buckets[0].getLen());
+        assertEquals(104L, buckets[1].getLen());
       }
       if 
(stat[i].getPath().getName().equals(makeDeleteDeltaDirNameCompacted(21, 24) + 
"_v0000026")) {
         sawNewDelta = true;
         FileStatus[] buckets = fs.listStatus(stat[i].getPath(), 
FileUtils.HIDDEN_FILES_PATH_FILTER);
-        Assert.assertEquals(2, buckets.length);
-        
Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
-        
Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
-        Assert.assertEquals(104L, buckets[0].getLen());
-        Assert.assertEquals(104L, buckets[1].getLen());
+        assertEquals(2, buckets.length);
+        assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+        assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+        assertEquals(104L, buckets[0].getLen());
+        assertEquals(104L, buckets[1].getLen());
       }
       else {
         LOG.debug("This is not the delta file you are looking for " + 
stat[i].getPath().getName());
       }
     }
-    Assert.assertTrue(toString(stat), sawNewDelta);
+    assertTrue(toString(stat), sawNewDelta);
   }
 
   /**
@@ -372,20 +381,20 @@ public void minorWithOpenInMiddle() throws Exception {
     // since compaction was not run, state should not be "ready for cleaning" 
but "refused"
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     List<ShowCompactResponseElement> compacts = rsp.getCompacts();
-    Assert.assertEquals(1, compacts.size());
-    Assert.assertEquals(TxnStore.REFUSED_RESPONSE, compacts.get(0).getState());
+    assertEquals(1, compacts.size());
+    assertEquals(TxnStore.REFUSED_RESPONSE, compacts.get(0).getState());
 
     // There should still be 4 directories in the location
     FileSystem fs = FileSystem.get(conf);
     FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
-    Assert.assertEquals(toString(stat), 4, stat.length);
+    assertEquals(toString(stat), 4, stat.length);
 
     // Find the new delta file and make sure it has the right contents
     Arrays.sort(stat);
-    Assert.assertEquals("base_20", stat[0].getPath().getName());
-    Assert.assertEquals(makeDeltaDirName(21, 22), stat[1].getPath().getName());
-    Assert.assertEquals(makeDeltaDirName(23, 25), stat[2].getPath().getName());
-    Assert.assertEquals(makeDeltaDirName(26, 27), stat[3].getPath().getName());
+    assertEquals("base_20", stat[0].getPath().getName());
+    assertEquals(makeDeltaDirName(21, 22), stat[1].getPath().getName());
+    assertEquals(makeDeltaDirName(23, 25), stat[2].getPath().getName());
+    assertEquals(makeDeltaDirName(26, 27), stat[3].getPath().getName());
   }
 
   @Test
@@ -407,22 +416,22 @@ public void minorWithAborted() throws Exception {
 
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     List<ShowCompactResponseElement> compacts = rsp.getCompacts();
-    Assert.assertEquals(1, compacts.size());
-    Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+    assertEquals(1, compacts.size());
+    assertEquals("ready for cleaning", compacts.get(0).getState());
 
     // There should still now be 6 directories in the location
     FileSystem fs = FileSystem.get(conf);
     FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
-    Assert.assertEquals(6, stat.length);
+    assertEquals(6, stat.length);
 
     // Find the new delta file and make sure it has the right contents
     Arrays.sort(stat);
-    Assert.assertEquals("base_20", stat[0].getPath().getName());
-    Assert.assertEquals(makeDeleteDeltaDirNameCompacted(21, 27) + "_v0000028", 
stat[1].getPath().getName());
-    Assert.assertEquals(makeDeltaDirName(21, 22), stat[2].getPath().getName());
-    Assert.assertEquals(makeDeltaDirNameCompacted(21, 27) + "_v0000028", 
stat[3].getPath().getName());
-    Assert.assertEquals(makeDeltaDirName(23, 25), stat[4].getPath().getName());
-    Assert.assertEquals(makeDeltaDirName(26, 27), stat[5].getPath().getName());
+    assertEquals("base_20", stat[0].getPath().getName());
+    assertEquals(makeDeleteDeltaDirNameCompacted(21, 27) + "_v0000028", 
stat[1].getPath().getName());
+    assertEquals(makeDeltaDirName(21, 22), stat[2].getPath().getName());
+    assertEquals(makeDeltaDirNameCompacted(21, 27) + "_v0000028", 
stat[3].getPath().getName());
+    assertEquals(makeDeltaDirName(23, 25), stat[4].getPath().getName());
+    assertEquals(makeDeltaDirName(26, 27), stat[5].getPath().getName());
   }
 
   @Test
@@ -444,13 +453,13 @@ public void minorPartitionWithBase() throws Exception {
 
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     List<ShowCompactResponseElement> compacts = rsp.getCompacts();
-    Assert.assertEquals(1, compacts.size());
-    Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+    assertEquals(1, compacts.size());
+    assertEquals("ready for cleaning", compacts.get(0).getState());
 
     // There should still be four directories in the location.
     FileSystem fs = FileSystem.get(conf);
     FileStatus[] stat = fs.listStatus(new Path(p.getSd().getLocation()));
-    Assert.assertEquals(5, stat.length);
+    assertEquals(5, stat.length);
 
     // Find the new delta file and make sure it has the right contents
     boolean sawNewDelta = false;
@@ -458,25 +467,25 @@ public void minorPartitionWithBase() throws Exception {
       if (stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(21, 24) 
+ "_v0000026")) {
         sawNewDelta = true;
         FileStatus[] buckets = fs.listStatus(stat[i].getPath(), 
FileUtils.HIDDEN_FILES_PATH_FILTER);
-        Assert.assertEquals(2, buckets.length);
-        
Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
-        
Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
-        Assert.assertEquals(104L, buckets[0].getLen());
-        Assert.assertEquals(104L, buckets[1].getLen());
+        assertEquals(2, buckets.length);
+        assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+        assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+        assertEquals(104L, buckets[0].getLen());
+        assertEquals(104L, buckets[1].getLen());
       }
       if 
(stat[i].getPath().getName().equals(makeDeleteDeltaDirNameCompacted(21, 24))) {
         sawNewDelta = true;
         FileStatus[] buckets = fs.listStatus(stat[i].getPath(), 
FileUtils.HIDDEN_FILES_PATH_FILTER);
-        Assert.assertEquals(2, buckets.length);
-        
Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
-        
Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
-        Assert.assertEquals(104L, buckets[0].getLen());
-        Assert.assertEquals(104L, buckets[1].getLen());
+        assertEquals(2, buckets.length);
+        assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+        assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+        assertEquals(104L, buckets[0].getLen());
+        assertEquals(104L, buckets[1].getLen());
       } else {
         LOG.debug("This is not the delta file you are looking for " + 
stat[i].getPath().getName());
       }
     }
-    Assert.assertTrue(toString(stat), sawNewDelta);
+    assertTrue(toString(stat), sawNewDelta);
   }
 
   @Test
@@ -496,13 +505,13 @@ public void minorTableNoBase() throws Exception {
 
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     List<ShowCompactResponseElement> compacts = rsp.getCompacts();
-    Assert.assertEquals(1, compacts.size());
-    Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+    assertEquals(1, compacts.size());
+    assertEquals("ready for cleaning", compacts.get(0).getState());
 
     // There should still now be 5 directories in the location
     FileSystem fs = FileSystem.get(conf);
     FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
-    Assert.assertEquals(4, stat.length);
+    assertEquals(4, stat.length);
 
     // Find the new delta file and make sure it has the right contents
     boolean sawNewDelta = false;
@@ -510,25 +519,25 @@ public void minorTableNoBase() throws Exception {
       if (stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(1, 4) + 
"_v0000006")) {
         sawNewDelta = true;
         FileStatus[] buckets = fs.listStatus(stat[i].getPath(), 
FileUtils.HIDDEN_FILES_PATH_FILTER);
-        Assert.assertEquals(2, buckets.length);
-        
Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
-        
Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
-        Assert.assertEquals(104L, buckets[0].getLen());
-        Assert.assertEquals(104L, buckets[1].getLen());
+        assertEquals(2, buckets.length);
+        assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+        assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+        assertEquals(104L, buckets[0].getLen());
+        assertEquals(104L, buckets[1].getLen());
       }
       if 
(stat[i].getPath().getName().equals(makeDeleteDeltaDirNameCompacted(1, 4) + 
"_v0000006")) {
         sawNewDelta = true;
         FileStatus[] buckets = fs.listStatus(stat[i].getPath(), 
FileUtils.HIDDEN_FILES_PATH_FILTER);
-        Assert.assertEquals(2, buckets.length);
-        
Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
-        
Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
-        Assert.assertEquals(104L, buckets[0].getLen());
-        Assert.assertEquals(104L, buckets[1].getLen());
+        assertEquals(2, buckets.length);
+        assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+        assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+        assertEquals(104L, buckets[0].getLen());
+        assertEquals(104L, buckets[1].getLen());
       } else {
         LOG.debug("This is not the delta file you are looking for " + 
stat[i].getPath().getName());
       }
     }
-    Assert.assertTrue(toString(stat), sawNewDelta);
+    assertTrue(toString(stat), sawNewDelta);
   }
 
   @Test
@@ -549,13 +558,13 @@ public void majorTableWithBase() throws Exception {
 
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     List<ShowCompactResponseElement> compacts = rsp.getCompacts();
-    Assert.assertEquals(1, compacts.size());
-    Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+    assertEquals(1, compacts.size());
+    assertEquals("ready for cleaning", compacts.get(0).getState());
 
     // There should still now be 5 directories in the location
     FileSystem fs = FileSystem.get(conf);
     FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
-    Assert.assertEquals(4, stat.length);
+    assertEquals(4, stat.length);
 
     // Find the new delta file and make sure it has the right contents
     boolean sawNewBase = false;
@@ -563,16 +572,16 @@ public void majorTableWithBase() throws Exception {
       if (stat[i].getPath().getName().equals("base_0000024_v0000026")) {
         sawNewBase = true;
         FileStatus[] buckets = fs.listStatus(stat[i].getPath(), 
FileUtils.HIDDEN_FILES_PATH_FILTER);
-        Assert.assertEquals(2, buckets.length);
-        
Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
-        
Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
-        Assert.assertEquals(624L, buckets[0].getLen());
-        Assert.assertEquals(624L, buckets[1].getLen());
+        assertEquals(2, buckets.length);
+        assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+        assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+        assertEquals(624L, buckets[0].getLen());
+        assertEquals(624L, buckets[1].getLen());
       } else {
         LOG.debug("This is not the file you are looking for " + 
stat[i].getPath().getName());
       }
     }
-    Assert.assertTrue(toString(stat), sawNewBase);
+    assertTrue(toString(stat), sawNewBase);
   }
 
   @Test
@@ -625,14 +634,14 @@ private void compactNoBaseLotsOfDeltas(CompactionType 
type) throws Exception {
 
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     List<ShowCompactResponseElement> compacts = rsp.getCompacts();
-    Assert.assertEquals(1, compacts.size());
-    Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+    assertEquals(1, compacts.size());
+    assertEquals("ready for cleaning", compacts.get(0).getState());
 
     FileSystem fs = FileSystem.get(conf);
     FileStatus[] stat = fs.listStatus(new Path(p.getSd().getLocation()));
     /* delete_delta_21_23 and delete_delta_25_33 which are created as a result 
of compacting*/
     int numFilesExpected = 11 + (type == CompactionType.MINOR ? 1 : 0);
-    Assert.assertEquals(numFilesExpected, stat.length);
+    assertEquals(numFilesExpected, stat.length);
 
     // Find the new delta file and make sure it has the right contents
     List<String> matchesNotFound = new ArrayList<>(numFilesExpected);
@@ -665,7 +674,7 @@ private void compactNoBaseLotsOfDeltas(CompactionType type) 
throws Exception {
     if(matchesNotFound.size() == 0) {
       return;
     }
-    Assert.assertTrue("Files remaining: " + matchesNotFound + "; " + 
toString(stat), false);
+    assertTrue("Files remaining: " + matchesNotFound + "; " + toString(stat), 
false);
   }
   @Test
   public void majorPartitionWithBase() throws Exception {
@@ -687,13 +696,13 @@ public void majorPartitionWithBase() throws Exception {
 
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     List<ShowCompactResponseElement> compacts = rsp.getCompacts();
-    Assert.assertEquals(1, compacts.size());
-    Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+    assertEquals(1, compacts.size());
+    assertEquals("ready for cleaning", compacts.get(0).getState());
 
     // There should still be four directories in the location.
     FileSystem fs = FileSystem.get(conf);
     FileStatus[] stat = fs.listStatus(new Path(p.getSd().getLocation()));
-    Assert.assertEquals(4, stat.length);
+    assertEquals(4, stat.length);
 
     // Find the new delta file and make sure it has the right contents
     boolean sawNewBase = false;
@@ -701,16 +710,16 @@ public void majorPartitionWithBase() throws Exception {
       if (stat[i].getPath().getName().equals("base_0000024_v0000026")) {
         sawNewBase = true;
         FileStatus[] buckets = fs.listStatus(stat[i].getPath(), 
FileUtils.HIDDEN_FILES_PATH_FILTER);
-        Assert.assertEquals(2, buckets.length);
-        
Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
-        
Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
-        Assert.assertEquals(624L, buckets[0].getLen());
-        Assert.assertEquals(624L, buckets[1].getLen());
+        assertEquals(2, buckets.length);
+        assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+        assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+        assertEquals(624L, buckets[0].getLen());
+        assertEquals(624L, buckets[1].getLen());
       } else {
         LOG.debug("This is not the file you are looking for " + 
stat[i].getPath().getName());
       }
     }
-    Assert.assertTrue(toString(stat), sawNewBase);
+    assertTrue(toString(stat), sawNewBase);
   }
 
   @Test
@@ -730,13 +739,13 @@ public void majorTableNoBase() throws Exception {
 
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     List<ShowCompactResponseElement> compacts = rsp.getCompacts();
-    Assert.assertEquals(1, compacts.size());
-    Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+    assertEquals(1, compacts.size());
+    assertEquals("ready for cleaning", compacts.get(0).getState());
 
     // There should now be 3 directories in the location
     FileSystem fs = FileSystem.get(conf);
     FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
-    Assert.assertEquals(3, stat.length);
+    assertEquals(3, stat.length);
 
     // Find the new delta file and make sure it has the right contents
     boolean sawNewBase = false;
@@ -744,16 +753,16 @@ public void majorTableNoBase() throws Exception {
       if (stat[i].getPath().getName().equals("base_0000004_v0000005")) {
         sawNewBase = true;
         FileStatus[] buckets = fs.listStatus(stat[i].getPath(), 
FileUtils.HIDDEN_FILES_PATH_FILTER);
-        Assert.assertEquals(2, buckets.length);
-        
Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
-        
Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
-        Assert.assertEquals(104L, buckets[0].getLen());
-        Assert.assertEquals(104L, buckets[1].getLen());
+        assertEquals(2, buckets.length);
+        assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+        assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+        assertEquals(104L, buckets[0].getLen());
+        assertEquals(104L, buckets[1].getLen());
       } else {
         LOG.debug("This is not the file you are looking for " + 
stat[i].getPath().getName());
       }
     }
-    Assert.assertTrue(toString(stat), sawNewBase);
+    assertTrue(toString(stat), sawNewBase);
   }
 
   private static String toString(FileStatus[] stat) {
@@ -785,8 +794,8 @@ public void majorTableLegacy() throws Exception {
 
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     List<ShowCompactResponseElement> compacts = rsp.getCompacts();
-    Assert.assertEquals(1, compacts.size());
-    Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+    assertEquals(1, compacts.size());
+    assertEquals("ready for cleaning", compacts.get(0).getState());
 
     // There should still now be 5 directories in the location
     FileSystem fs = FileSystem.get(conf);
@@ -799,16 +808,16 @@ public void majorTableLegacy() throws Exception {
       if (stat[i].getPath().getName().equals("base_0000024_v0000026")) {
         sawNewBase = true;
         FileStatus[] buckets = fs.listStatus(stat[i].getPath(), 
FileUtils.HIDDEN_FILES_PATH_FILTER);
-        Assert.assertEquals(2, buckets.length);
-        
Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
-        
Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
-        Assert.assertEquals(624L, buckets[0].getLen());
-        Assert.assertEquals(624L, buckets[1].getLen());
+        assertEquals(2, buckets.length);
+        assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+        assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+        assertEquals(624L, buckets[0].getLen());
+        assertEquals(624L, buckets[1].getLen());
       } else {
         LOG.debug("This is not the file you are looking for " + 
stat[i].getPath().getName());
       }
     }
-    Assert.assertTrue(toString(stat), sawNewBase);
+    assertTrue(toString(stat), sawNewBase);
   }
 
   @Test
@@ -829,8 +838,8 @@ public void minorTableLegacy() throws Exception {
 
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     List<ShowCompactResponseElement> compacts = rsp.getCompacts();
-    Assert.assertEquals(1, compacts.size());
-    Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+    assertEquals(1, compacts.size());
+    assertEquals("ready for cleaning", compacts.get(0).getState());
 
     // There should still now be 5 directories in the location
     FileSystem fs = FileSystem.get(conf);
@@ -842,14 +851,14 @@ public void minorTableLegacy() throws Exception {
       if (stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(21, 24) 
+ "_v0000026")) {
         sawNewDelta = true;
         FileStatus[] buckets = fs.listStatus(stat[i].getPath(), 
FileUtils.HIDDEN_FILES_PATH_FILTER);
-        Assert.assertEquals(2, buckets.length);
-        
Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
-        
Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+        assertEquals(2, buckets.length);
+        assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+        assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
       } else {
         LOG.debug("This is not the file you are looking for " + 
stat[i].getPath().getName());
       }
     }
-    Assert.assertTrue(toString(stat), sawNewDelta);
+    assertTrue(toString(stat), sawNewDelta);
   }
 
   @Test
@@ -873,13 +882,13 @@ public void majorPartitionWithBaseMissingBuckets() throws 
Exception {
 
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     List<ShowCompactResponseElement> compacts = rsp.getCompacts();
-    Assert.assertEquals(1, compacts.size());
-    Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+    assertEquals(1, compacts.size());
+    assertEquals("ready for cleaning", compacts.get(0).getState());
 
     // There should still be four directories in the location.
     FileSystem fs = FileSystem.get(conf);
     FileStatus[] stat = fs.listStatus(new Path(p.getSd().getLocation()));
-    Assert.assertEquals(4, stat.length);
+    assertEquals(4, stat.length);
 
     // Find the new delta file and make sure it has the right contents
     boolean sawNewBase = false;
@@ -887,11 +896,11 @@ public void majorPartitionWithBaseMissingBuckets() throws 
Exception {
       if (stat[i].getPath().getName().equals("base_0000026_v0000028")) {
         sawNewBase = true;
         FileStatus[] buckets = fs.listStatus(stat[i].getPath(), 
FileUtils.HIDDEN_FILES_PATH_FILTER);
-        Assert.assertEquals(2, buckets.length);
-        
Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
-        
Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+        assertEquals(2, buckets.length);
+        assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+        assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
         // Bucket 0 should be small and bucket 1 should be large, make sure 
that's the case
-        Assert.assertTrue(
+        assertTrue(
             ("bucket_00000".equals(buckets[0].getPath().getName()) && 104L == 
buckets[0].getLen()
             && "bucket_00001".equals(buckets[1].getPath().getName()) && 676L 
== buckets[1]
                 .getLen())
@@ -904,7 +913,7 @@ public void majorPartitionWithBaseMissingBuckets() throws 
Exception {
         LOG.debug("This is not the file you are looking for " + 
stat[i].getPath().getName());
       }
     }
-    Assert.assertTrue(toString(stat), sawNewBase);
+    assertTrue(toString(stat), sawNewBase);
   }
 
   @Test
@@ -926,21 +935,21 @@ public void majorWithOpenInMiddle() throws Exception {
 
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     List<ShowCompactResponseElement> compacts = rsp.getCompacts();
-    Assert.assertEquals(1, compacts.size());
-    Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+    assertEquals(1, compacts.size());
+    assertEquals("ready for cleaning", compacts.get(0).getState());
 
     // There should still now be 5 directories in the location
     FileSystem fs = FileSystem.get(conf);
     FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
-    Assert.assertEquals(5, stat.length);
+    assertEquals(5, stat.length);
 
     // Find the new delta file and make sure it has the right contents
     Arrays.sort(stat);
-    Assert.assertEquals("base_0000022_v0000028", stat[0].getPath().getName());
-    Assert.assertEquals("base_20", stat[1].getPath().getName());
-    Assert.assertEquals(makeDeltaDirName(21, 22), stat[2].getPath().getName());
-    Assert.assertEquals(makeDeltaDirName(23, 25), stat[3].getPath().getName());
-    Assert.assertEquals(makeDeltaDirName(26, 27), stat[4].getPath().getName());
+    assertEquals("base_0000022_v0000028", stat[0].getPath().getName());
+    assertEquals("base_20", stat[1].getPath().getName());
+    assertEquals(makeDeltaDirName(21, 22), stat[2].getPath().getName());
+    assertEquals(makeDeltaDirName(23, 25), stat[3].getPath().getName());
+    assertEquals(makeDeltaDirName(26, 27), stat[4].getPath().getName());
   }
 
   @Test
@@ -962,21 +971,21 @@ public void majorWithAborted() throws Exception {
 
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     List<ShowCompactResponseElement> compacts = rsp.getCompacts();
-    Assert.assertEquals(1, compacts.size());
-    Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+    assertEquals(1, compacts.size());
+    assertEquals("ready for cleaning", compacts.get(0).getState());
 
     // There should still now be 5 directories in the location
     FileSystem fs = FileSystem.get(conf);
     FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
-    Assert.assertEquals(5, stat.length);
+    assertEquals(5, stat.length);
 
     // Find the new delta file and make sure it has the right contents
     Arrays.sort(stat);
-    Assert.assertEquals("base_0000027_v0000028", stat[0].getPath().getName());
-    Assert.assertEquals("base_20", stat[1].getPath().getName());
-    Assert.assertEquals(makeDeltaDirName(21, 22), stat[2].getPath().getName());
-    Assert.assertEquals(makeDeltaDirName(23, 25), stat[3].getPath().getName());
-    Assert.assertEquals(makeDeltaDirName(26, 27), stat[4].getPath().getName());
+    assertEquals("base_0000027_v0000028", stat[0].getPath().getName());
+    assertEquals("base_20", stat[1].getPath().getName());
+    assertEquals(makeDeltaDirName(21, 22), stat[2].getPath().getName());
+    assertEquals(makeDeltaDirName(23, 25), stat[3].getPath().getName());
+    assertEquals(makeDeltaDirName(26, 27), stat[4].getPath().getName());
   }
   @Override
   boolean useHive130DeltaDirName() {
@@ -1008,10 +1017,10 @@ public void testWorkerAndInitiatorVersion() throws 
Exception {
 
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     List<ShowCompactResponseElement> compacts = rsp.getCompacts();
-    Assert.assertEquals(1, compacts.size());
-    Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
-    Assert.assertEquals(initiatorVersion, 
compacts.get(0).getInitiatorVersion());
-    Assert.assertEquals(workerVersion, compacts.get(0).getWorkerVersion());
+    assertEquals(1, compacts.size());
+    assertEquals("ready for cleaning", compacts.get(0).getState());
+    assertEquals(initiatorVersion, compacts.get(0).getInitiatorVersion());
+    assertEquals(workerVersion, compacts.get(0).getWorkerVersion());
 
   }
 
@@ -1072,7 +1081,7 @@ public void droppedTable() throws Exception {
 
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     List<ShowCompactResponseElement> compacts = rsp.getCompacts();
-    Assert.assertEquals(0, compacts.size());
+    assertEquals(0, compacts.size());
   }
 
   @Test
@@ -1097,7 +1106,7 @@ public void droppedPartition() throws Exception {
 
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     List<ShowCompactResponseElement> compacts = rsp.getCompacts();
-    Assert.assertEquals(0, compacts.size());
+    assertEquals(0, compacts.size());
   }
 
   @Test
@@ -1148,8 +1157,8 @@ public void insertOnlyDisabled() throws Exception {
 
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     List<ShowCompactResponseElement> compacts = rsp.getCompacts();
-    Assert.assertEquals(1, compacts.size());
-    Assert.assertEquals("failed", compacts.get(0).getState());
+    assertEquals(1, compacts.size());
+    assertEquals("failed", compacts.get(0).getState());
 
   }
 
@@ -1162,21 +1171,21 @@ private void verifyTxn1IsAborted(int compactionNum, 
Table t, CompactionType type
     // Compaction should not have run on a single delta file
     FileSystem fs = FileSystem.get(conf);
     FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
-    Assert.assertEquals(1, stat.length);
-    Assert.assertEquals(makeDeltaDirName(0, 2), stat[0].getPath().getName());
+    assertEquals(1, stat.length);
+    assertEquals(makeDeltaDirName(0, 2), stat[0].getPath().getName());
 
     // State should not be "ready for cleaning" because we skip cleaning
     List<ShowCompactResponseElement> compacts =
         txnHandler.showCompact(new ShowCompactRequest()).getCompacts();
-    Assert.assertEquals(compactionNum + 1, compacts.size());
-    Assert.assertEquals(TxnStore.REFUSED_RESPONSE, 
compacts.get(compactionNum).getState());
+    assertEquals(compactionNum + 1, compacts.size());
+    assertEquals(TxnStore.REFUSED_RESPONSE, 
compacts.get(compactionNum).getState());
 
     // assert transaction with txnId=1 is still aborted after cleaner is run
     startCleaner();
     List<TxnInfo> openTxns =
         
HiveMetaStoreUtils.getHiveMetastoreClient(conf).showTxns().getOpen_txns();
-    Assert.assertEquals(1, openTxns.get(0).getId());
-    Assert.assertEquals(TxnState.ABORTED, openTxns.get(0).getState());
+    assertEquals(1, openTxns.get(0).getId());
+    assertEquals(TxnState.ABORTED, openTxns.get(0).getState());
   }
 
   // With high timeout, but fast run we should finish without a problem
@@ -1197,6 +1206,186 @@ public void testTimeoutWithoutInterrupt() throws 
Exception {
     runTimeoutTest(1, true, true);
   }
 
+  @Test
+  public void testExceptionWhenTxnCommitAndMarkFailed() throws Exception {
+    prepareTableAndCompaction("default", "tbforcomperror");
+    runWorkerWithException(MethodToFail.COMMIT_TXN, MethodToFail.MARK_FAILED);
+
+    List<ShowCompactResponseElement> compacts =
+        txnHandler.showCompact(new ShowCompactRequest()).getCompacts();
+    assertEquals(TxnStore.WORKING_RESPONSE, compacts.get(0).getState());
+    List<TxnInfo> openTxns = 
HiveMetaStoreUtils.getHiveMetastoreClient(conf).showTxns().getOpen_txns();
+    assertEquals(1, openTxns.size());
+    TxnInfo txn = openTxns.get(0);
+    assertEquals(compacts.get(0).getTxnId(), txn.getId());
+    assertEquals(TxnState.OPEN, txn.getState());
+    txnHandler.abortTxn(new AbortTxnRequest(txn.getId()));
+  }
+
+  @Test
+  public void testExceptionWhenTxnCommit() throws Exception {
+    prepareTableAndCompaction("default", "tbforcomperror");
+    runWorkerWithException(MethodToFail.COMMIT_TXN);
+
+    List<ShowCompactResponseElement> compacts = txnHandler.showCompact(new 
ShowCompactRequest()).getCompacts();
+    ShowCompactResponseElement compaction = compacts.get(0);
+    assertEquals(TxnStore.FAILED_RESPONSE, compaction.getState());
+    assertEquals("Simulated failure in commitTxn", 
compaction.getErrorMessage());
+    List<TxnInfo> openTxns = 
HiveMetaStoreUtils.getHiveMetastoreClient(conf).showTxns().getOpen_txns();
+    assertEquals(1, openTxns.size());
+    TxnInfo txn = openTxns.get(0);
+    assertEquals(compaction.getTxnId(), txn.getId());
+    assertEquals(TxnState.OPEN, txn.getState());
+    txnHandler.abortTxn(new AbortTxnRequest(txn.getId()));
+  }
+
+  @Test
+  public void testExceptionWhenMarkCompacted() throws Exception {
+    prepareTableAndCompaction("default", "tbforcomperror");
+    runWorkerWithException(MethodToFail.MARK_COMPACTED);
+
+    List<ShowCompactResponseElement> compacts = txnHandler.showCompact(new 
ShowCompactRequest()).getCompacts();
+    ShowCompactResponseElement compaction = compacts.get(0);
+    assertEquals(TxnStore.FAILED_RESPONSE, compaction.getState());
+    assertEquals("Simulated failure in markCompacted", 
compaction.getErrorMessage());
+    List<TxnInfo> openTxns = 
HiveMetaStoreUtils.getHiveMetastoreClient(conf).showTxns().getOpen_txns();
+    assertEquals(0, openTxns.size());
+  }
+
+  @Test
+  public void testExceptionDuringCompact() throws Exception {
+    prepareTableAndCompaction("default", "tbforcomperror");
+    HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEST, true);
+    HiveConf.setBoolVar(conf, 
HiveConf.ConfVars.HIVE_TEST_MODE_FAIL_COMPACTION, true);
+    startWorker();
+
+    List<ShowCompactResponseElement> compacts = txnHandler.showCompact(new 
ShowCompactRequest()).getCompacts();
+    ShowCompactResponseElement compaction = compacts.get(0);
+    assertEquals(TxnStore.FAILED_RESPONSE, compaction.getState());
+    assertEquals("HIVE_TEST_MODE_FAIL_COMPACTION=true", 
compaction.getErrorMessage());
+    List<TxnInfo> openTxns = 
HiveMetaStoreUtils.getHiveMetastoreClient(conf).showTxns().getOpen_txns();
+    assertEquals(1, openTxns.size());
+    TxnInfo txn = openTxns.get(0);
+    assertEquals(compaction.getTxnId(), txn.getId());
+    assertEquals(TxnState.ABORTED, txn.getState());
+  }
+
+  @Test
+  public void testWorkerIfIsDynPartAbort() throws Exception {
+    String dbName = "default";
+    String tableName = "tbforcomperror";
+    Table t = newTable(dbName, tableName, true);
+    addBaseFile(t, null, 1L, 3, 1);
+    addDeltaFile(t, null, 2L, 2L, 1);
+    addDeltaFile(t, null, 3L, 3L, 1);
+    addDeltaFile(t, null, 4L, 4L, 1);
+    burnThroughTransactions(dbName, tableName, 4, null, null);
+    // trigger compaction
+    CompactionRequest rqst = new CompactionRequest(dbName, tableName, 
CompactionType.MAJOR);
+    rqst.setPartitionname(null);
+    txnHandler.compact(rqst);
+    startWorker();
+
+    List<ShowCompactResponseElement> compacts = txnHandler.showCompact(new 
ShowCompactRequest()).getCompacts();
+    ShowCompactResponseElement compaction = compacts.get(0);
+    assertEquals(TxnStore.CLEANING_RESPONSE, compaction.getState());
+    assertTrue(compaction.getNextTxnId() > 0L);
+    List<TxnInfo> openTxns = 
HiveMetaStoreUtils.getHiveMetastoreClient(conf).showTxns().getOpen_txns();
+    assertEquals(0, openTxns.size());
+  }
+
+  @Test
+  public void testWorkerNotEnoughToCompact() throws Exception {
+    String dbName = "default";
+    String tableName = "tbforcomperror";
+    Table t = newTable(dbName, tableName, false);
+    addBaseFile(t, null, 1L, 3, 1);
+    burnThroughTransactions(dbName, tableName, 1, null, null);
+    // trigger compaction
+    CompactionRequest rqst = new CompactionRequest(dbName, tableName, 
CompactionType.MAJOR);
+    txnHandler.compact(rqst);
+    startWorker();
+
+    List<ShowCompactResponseElement> compacts = txnHandler.showCompact(new 
ShowCompactRequest()).getCompacts();
+    ShowCompactResponseElement compaction = compacts.get(0);
+    assertEquals(TxnStore.REFUSED_RESPONSE, compaction.getState());
+    assertTrue(compaction.getErrorMessage().contains("None of the compaction 
thresholds met, compaction request is refused!"));
+    List<TxnInfo> openTxns = 
HiveMetaStoreUtils.getHiveMetastoreClient(conf).showTxns().getOpen_txns();
+    assertEquals(0, openTxns.size());
+  }
+
+  @Test
+  public void testWorkerNotEnoughToCompactNeedsCleaning() throws Exception {
+    String dbName = "default";
+    String tableName = "tbforcomperror";
+    Table t = newTable(dbName, tableName, false);
+    addDeltaFile(t, null, 20L, 20L, 10);
+    addDeltaFile(t, null, 21L, 21L, 10);
+    addDeltaFile(t, null, 22L, 22L, 10);
+    addDeltaFile(t, null, 23L, 23L, 10);
+    addDeltaFile(t, null, 24L, 24L, 10);
+    burnThroughTransactions(dbName, tableName, 25, null, new 
HashSet<Long>(Arrays.asList(20L, 21L, 22L, 23L, 24L)));
+    // trigger compaction
+    CompactionRequest rqst = new CompactionRequest(dbName, tableName, 
CompactionType.MAJOR);
+    txnHandler.compact(rqst);
+    startWorker();
+
+    List<ShowCompactResponseElement> compacts = txnHandler.showCompact(new 
ShowCompactRequest()).getCompacts();
+    ShowCompactResponseElement compaction = compacts.get(0);
+    assertEquals(TxnStore.CLEANING_RESPONSE, compaction.getState());
+    assertTrue(compaction.getNextTxnId() > 0L);
+    List<TxnInfo> openTxns = 
HiveMetaStoreUtils.getHiveMetastoreClient(conf).showTxns().getOpen_txns();
+    assertEquals(5, openTxns.size());
+    assertEquals(20L, openTxns.get(0).getId());
+    assertEquals(21L, openTxns.get(1).getId());
+    assertEquals(22L, openTxns.get(2).getId());
+    assertEquals(23L, openTxns.get(3).getId());
+    assertEquals(24L, openTxns.get(4).getId());
+
+  }
+
+  private void runWorkerWithException(MethodToFail... methodToFail) throws 
Exception {
+    IMetaStoreClient spyMsc = Mockito.spy(ms);
+    for (MethodToFail method: methodToFail) {
+      switch (method) {
+      case MARK_FAILED -> doThrow(new TTransportException("Simulated failure 
in markFailed")).when(spyMsc).markFailed(any());
+      case COMMIT_TXN -> doThrow(new TException("Simulated failure in 
commitTxn")).when(spyMsc).commitTxn(anyLong());
+      case MARK_COMPACTED -> doThrow(new TTransportException("Simulated 
failure in markCompacted")).when(spyMsc).markCompacted(any());
+      }
+    }
+
+    TestTxnDbUtil.setConfValues(conf);
+    Worker worker = Mockito.spy(new Worker());
+    worker.setConf(conf);
+    AtomicBoolean stop = new AtomicBoolean();
+    stop.set(true);
+    worker.init(stop);
+    worker.msc = spyMsc;
+    worker.setName("testworker");
+    CompactorThread ct = worker;
+    ct.run();
+  }
+
+  private void prepareTableAndCompaction(String dbName, String tableName) 
throws Exception {
+    Table t = newTable(dbName, tableName, false);
+    addBaseFile(t, null, 20L, 20);
+    addDeltaFile(t, null, 21L, 23L, 2);
+    addDeltaFile(t, null, 23L, 23L, 3);
+    addDeltaFile(t, null, 24L, 24L, 2);
+    addDeltaFile(t, null, 25L, 25L, 3);
+    addDeltaFile(t, null, 26L, 26L, 3);
+    burnThroughTransactions(dbName, tableName, 27, null, null);
+    // trigger compaction
+    CompactionRequest rqst = new CompactionRequest(dbName, tableName, 
CompactionType.MAJOR);
+    txnHandler.compact(rqst);
+  }
+
+  enum MethodToFail {
+    MARK_COMPACTED,
+    MARK_FAILED,
+    COMMIT_TXN;
+  }
+
   private void runTimeoutTest(long timeout, boolean runForever, boolean 
swallowInterrupt) throws Exception {
     ExecutorService executor = Executors.newSingleThreadExecutor();
     HiveConf timeoutConf = new HiveConf(conf);

Reply via email to