This is an automated email from the ASF dual-hosted git repository.
kuczoram pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 83d98f42fc7 HIVE-29571: ACID Compaction: Marking the compaction as
compacted should happen after its txn got committed (#6511)
83d98f42fc7 is described below
commit 83d98f42fc7374478e31a3e389be5981e7d489d2
Author: Marta Kuczora <[email protected]>
AuthorDate: Fri May 29 09:57:56 2026 +0200
HIVE-29571: ACID Compaction: Marking the compaction as compacted should
happen after its txn got committed (#6511)
---
.../compactor/service/AcidCompactionService.java | 49 +-
.../hadoop/hive/ql/txn/compactor/TestWorker.java | 529 ++++++++++++++-------
2 files changed, 387 insertions(+), 191 deletions(-)
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java
index d1c7e3972d0..cf6e4c02e4c 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java
@@ -48,6 +48,7 @@
import org.apache.hadoop.hive.ql.txn.compactor.CompactorFactory;
import org.apache.hadoop.hive.ql.txn.compactor.CompactorPipeline;
import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil;
+import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil.ThrowingRunnable;
import org.apache.hadoop.hive.ql.txn.compactor.QueryCompactor;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.common.util.Ref;
@@ -211,32 +212,27 @@ public Boolean compact(Table table, CompactionInfo ci)
throws Exception {
// Don't start compaction or cleaning if not necessary
if (isDynPartAbort(table, ci)) {
- msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci));
- compactionTxn.wasSuccessful();
+ compactionTxn.markForCommit(() ->
msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci)));
return false;
}
dir = getAcidStateForWorker(ci, sd, tblValidWriteIds);
if (!isEnoughToCompact(ci, dir, sd)) {
if (needsCleaning(dir, sd)) {
- msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci));
+ compactionTxn.markForCommit(() ->
msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci)));
} else {
// do nothing
ci.errorMessage = "None of the compaction thresholds met, compaction
request is refused!";
LOG.debug(ci.errorMessage + " Compaction info: {}", ci);
- msc.markRefused(CompactionInfo.compactionInfoToStruct(ci));
+ compactionTxn.markForCommit(() ->
msc.markRefused(CompactionInfo.compactionInfoToStruct(ci)));
+
}
- compactionTxn.wasSuccessful();
return false;
}
if (!ci.isMajorCompaction() &&
!CompactorUtil.isMinorCompactionSupported(conf, table.getParameters(), dir)) {
ci.errorMessage = "Query based Minor compaction is not possible for
full acid tables having raw format " +
"(non-acid) data in them.";
LOG.error(ci.errorMessage + " Compaction info: {}", ci);
- try {
- msc.markRefused(CompactionInfo.compactionInfoToStruct(ci));
- } catch (Throwable tr) {
- LOG.error("Caught an exception while trying to mark compaction {} as
failed: {}", ci, tr);
- }
+ compactionTxn.markForAbort(() ->
msc.markRefused(CompactionInfo.compactionInfoToStruct(ci)));
return false;
}
CompactorUtil.checkInterrupt(CLASS_NAME);
@@ -261,8 +257,7 @@ public Boolean compact(Table table, CompactionInfo ci)
throws Exception {
LOG.info("Completed " + ci.type.toString() + " compaction for " +
ci.getFullPartitionName() + " in "
+ compactionTxn + ", marking as compacted.");
- msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci));
- compactionTxn.wasSuccessful();
+ compactionTxn.markForCommit(() ->
msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci)));
AcidMetricService.updateMetricsFromWorker(ci.dbname, ci.tableName,
ci.partName, ci.type,
dir.getCurrentDirectories().size(), dir.getDeleteDeltas().size(),
conf, msc);
@@ -346,7 +341,11 @@ class CompactionTxn implements AutoCloseable {
private long lockId = 0;
private TxnStatus status = TxnStatus.UNKNOWN;
- private boolean successfulCompaction = false;
+
+ private ThrowingRunnable onCommitSuccess;
+ private ThrowingRunnable onAbortSuccess;
+
+ private boolean rollbackOnly = true;
/**
* Try to open a new txn.
@@ -377,11 +376,13 @@ private LockRequest createLockRequest(CompactionInfo ci) {
return CompactorUtil.createLockRequest(conf, ci, txnId,
lockAndOpType.getKey(), lockAndOpType.getValue());
}
- /**
- * Mark compaction as successful. This means the txn will be committed;
otherwise it will be aborted.
- */
- void wasSuccessful() {
- this.successfulCompaction = true;
+ void markForCommit(ThrowingRunnable action) {
+ this.rollbackOnly = false;
+ this.onCommitSuccess = action;
+ }
+
+ void markForAbort(ThrowingRunnable action) {
+ this.onAbortSuccess = action;
}
/**
@@ -396,10 +397,16 @@ public void close() throws Exception {
//the transaction is about to close, we can stop heartbeating
regardless of it's state
CompactionHeartbeatService.getInstance(conf).stopHeartbeat(txnId);
} finally {
- if (successfulCompaction) {
- commit();
- } else {
+ if (rollbackOnly) {
abort();
+ if (onAbortSuccess != null) {
+ onAbortSuccess.run();
+ }
+ return;
+ }
+ commit();
+ if (onCommitSuccess != null) {
+ onCommitSuccess.run();
}
}
}
diff --git
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
index bf01034711e..f97237353f7 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
@@ -26,6 +26,7 @@
import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.TransactionalValidationListener;
+import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
import org.apache.hadoop.hive.metastore.api.CompactionRequest;
import org.apache.hadoop.hive.metastore.api.CompactionType;
import org.apache.hadoop.hive.metastore.api.FindNextCompactRequest;
@@ -41,7 +42,10 @@
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.utils.StringableMap;
+import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
@@ -70,8 +74,13 @@
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.hadoop.hive.common.AcidConstants.VISIBILITY_PATTERN;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -100,9 +109,9 @@ public void stringableMap() throws Exception {
// Empty map case
StringableMap m = new StringableMap(new HashMap<String, String>());
String s = m.toString();
- Assert.assertEquals("0:", s);
+ assertEquals("0:", s);
m = new StringableMap(s);
- Assert.assertEquals(0, m.size());
+ assertEquals(0, m.size());
Map<String, String> base = new HashMap<String, String>();
base.put("mary", "poppins");
@@ -111,22 +120,22 @@ public void stringableMap() throws Exception {
m = new StringableMap(base);
s = m.toString();
m = new StringableMap(s);
- Assert.assertEquals(3, m.size());
+ assertEquals(3, m.size());
Map<String, Boolean> saw = new HashMap<String, Boolean>(3);
saw.put("mary", false);
saw.put("bert", false);
saw.put(null, false);
for (Map.Entry<String, String> e : m.entrySet()) {
saw.put(e.getKey(), true);
- if ("mary".equals(e.getKey())) Assert.assertEquals("poppins",
e.getValue());
+ if ("mary".equals(e.getKey())) assertEquals("poppins", e.getValue());
else if ("bert".equals(e.getKey())) Assert.assertNull(e.getValue());
- else if (null == e.getKey()) Assert.assertEquals("banks", e.getValue());
+ else if (null == e.getKey()) assertEquals("banks", e.getValue());
else Assert.fail("Unexpected value " + e.getKey());
}
- Assert.assertEquals(3, saw.size());
- Assert.assertTrue(saw.get("mary"));
- Assert.assertTrue(saw.get("bert"));
- Assert.assertTrue(saw.get(null));
+ assertEquals(3, saw.size());
+ assertTrue(saw.get("mary"));
+ assertTrue(saw.get("bert"));
+ assertTrue(saw.get(null));
}
@Test
@@ -134,26 +143,26 @@ public void stringableList() throws Exception {
// Empty list case
MRCompactor.StringableList ls = new MRCompactor.StringableList();
String s = ls.toString();
- Assert.assertEquals("0:", s);
+ assertEquals("0:", s);
ls = new MRCompactor.StringableList(s);
- Assert.assertEquals(0, ls.size());
+ assertEquals(0, ls.size());
ls = new MRCompactor.StringableList();
ls.add(new Path("/tmp"));
ls.add(new Path("/usr"));
s = ls.toString();
- Assert.assertTrue("Expected 2:4:/tmp4:/usr or 2:4:/usr4:/tmp, got " + s,
+ assertTrue("Expected 2:4:/tmp4:/usr or 2:4:/usr4:/tmp, got " + s,
"2:4:/tmp4:/usr".equals(s) || "2:4:/usr4:/tmp".equals(s));
ls = new MRCompactor.StringableList(s);
- Assert.assertEquals(2, ls.size());
+ assertEquals(2, ls.size());
boolean sawTmp = false, sawUsr = false;
for (Path p : ls) {
if ("/tmp".equals(p.toString())) sawTmp = true;
else if ("/usr".equals(p.toString())) sawUsr = true;
else Assert.fail("Unexpected path " + p.toString());
}
- Assert.assertTrue(sawTmp);
- Assert.assertTrue(sawUsr);
+ assertTrue(sawTmp);
+ assertTrue(sawUsr);
}
@Test
@@ -181,10 +190,10 @@ public void inputSplit() throws Exception {
MRCompactor.CompactorInputSplit split =
new MRCompactor.CompactorInputSplit(conf, 3, files, new
Path(basename), deltas, new HashMap<String, Integer>());
- Assert.assertEquals(520L, split.getLength());
+ assertEquals(520L, split.getLength());
String[] locations = split.getLocations();
- Assert.assertEquals(1, locations.length);
- Assert.assertEquals("localhost", locations[0]);
+ assertEquals(1, locations.length);
+ assertEquals("localhost", locations[0]);
ByteArrayOutputStream buf = new ByteArrayOutputStream();
DataOutput out = new DataOutputStream(buf);
@@ -194,12 +203,12 @@ public void inputSplit() throws Exception {
DataInput in = new DataInputStream(new
ByteArrayInputStream(buf.toByteArray()));
split.readFields(in);
- Assert.assertEquals(3, split.getBucket());
- Assert.assertEquals(basename, split.getBaseDir().toString());
+ assertEquals(3, split.getBucket());
+ assertEquals(basename, split.getBaseDir().toString());
deltas = split.getDeltaDirs();
- Assert.assertEquals(2, deltas.length);
- Assert.assertEquals(delta1, deltas[0].toString());
- Assert.assertEquals(delta2, deltas[1].toString());
+ assertEquals(2, deltas.length);
+ assertEquals(delta1, deltas[0].toString());
+ assertEquals(delta2, deltas[1].toString());
}
@Test
@@ -234,12 +243,12 @@ public void inputSplitNullBase() throws Exception {
DataInput in = new DataInputStream(new
ByteArrayInputStream(buf.toByteArray()));
split.readFields(in);
- Assert.assertEquals(3, split.getBucket());
+ assertEquals(3, split.getBucket());
Assert.assertNull(split.getBaseDir());
deltas = split.getDeltaDirs();
- Assert.assertEquals(2, deltas.length);
- Assert.assertEquals(delta1, deltas[0].toString());
- Assert.assertEquals(delta2, deltas[1].toString());
+ assertEquals(2, deltas.length);
+ assertEquals(delta1, deltas[0].toString());
+ assertEquals(delta2, deltas[1].toString());
}
@Test
@@ -264,7 +273,7 @@ public void sortedTable() throws Exception {
// There should still be four directories in the location.
FileSystem fs = FileSystem.get(conf);
FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
- Assert.assertEquals(4, stat.length);
+ assertEquals(4, stat.length);
}
@Test
@@ -291,7 +300,7 @@ public void sortedPartition() throws Exception {
// There should still be four directories in the location.
FileSystem fs = FileSystem.get(conf);
FileStatus[] stat = fs.listStatus(new Path(p.getSd().getLocation()));
- Assert.assertEquals(4, stat.length);
+ assertEquals(4, stat.length);
}
@Test
@@ -312,13 +321,13 @@ public void minorTableWithBase() throws Exception {
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
- Assert.assertEquals(1, compacts.size());
- Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+ assertEquals(1, compacts.size());
+ assertEquals("ready for cleaning", compacts.get(0).getState());
// There should still now be 5 directories in the location
FileSystem fs = FileSystem.get(conf);
FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
- Assert.assertEquals(5, stat.length);
+ assertEquals(5, stat.length);
// Find the new delta file and make sure it has the right contents
boolean sawNewDelta = false;
@@ -326,26 +335,26 @@ public void minorTableWithBase() throws Exception {
if (stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(21, 24)
+ "_v0000026")) {
sawNewDelta = true;
FileStatus[] buckets = fs.listStatus(stat[i].getPath(),
FileUtils.HIDDEN_FILES_PATH_FILTER);
- Assert.assertEquals(2, buckets.length);
-
Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
-
Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
- Assert.assertEquals(104L, buckets[0].getLen());
- Assert.assertEquals(104L, buckets[1].getLen());
+ assertEquals(2, buckets.length);
+ assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+ assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+ assertEquals(104L, buckets[0].getLen());
+ assertEquals(104L, buckets[1].getLen());
}
if
(stat[i].getPath().getName().equals(makeDeleteDeltaDirNameCompacted(21, 24) +
"_v0000026")) {
sawNewDelta = true;
FileStatus[] buckets = fs.listStatus(stat[i].getPath(),
FileUtils.HIDDEN_FILES_PATH_FILTER);
- Assert.assertEquals(2, buckets.length);
-
Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
-
Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
- Assert.assertEquals(104L, buckets[0].getLen());
- Assert.assertEquals(104L, buckets[1].getLen());
+ assertEquals(2, buckets.length);
+ assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+ assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+ assertEquals(104L, buckets[0].getLen());
+ assertEquals(104L, buckets[1].getLen());
}
else {
LOG.debug("This is not the delta file you are looking for " +
stat[i].getPath().getName());
}
}
- Assert.assertTrue(toString(stat), sawNewDelta);
+ assertTrue(toString(stat), sawNewDelta);
}
/**
@@ -372,20 +381,20 @@ public void minorWithOpenInMiddle() throws Exception {
// since compaction was not run, state should not be "ready for cleaning"
but "refused"
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
- Assert.assertEquals(1, compacts.size());
- Assert.assertEquals(TxnStore.REFUSED_RESPONSE, compacts.get(0).getState());
+ assertEquals(1, compacts.size());
+ assertEquals(TxnStore.REFUSED_RESPONSE, compacts.get(0).getState());
// There should still be 4 directories in the location
FileSystem fs = FileSystem.get(conf);
FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
- Assert.assertEquals(toString(stat), 4, stat.length);
+ assertEquals(toString(stat), 4, stat.length);
// Find the new delta file and make sure it has the right contents
Arrays.sort(stat);
- Assert.assertEquals("base_20", stat[0].getPath().getName());
- Assert.assertEquals(makeDeltaDirName(21, 22), stat[1].getPath().getName());
- Assert.assertEquals(makeDeltaDirName(23, 25), stat[2].getPath().getName());
- Assert.assertEquals(makeDeltaDirName(26, 27), stat[3].getPath().getName());
+ assertEquals("base_20", stat[0].getPath().getName());
+ assertEquals(makeDeltaDirName(21, 22), stat[1].getPath().getName());
+ assertEquals(makeDeltaDirName(23, 25), stat[2].getPath().getName());
+ assertEquals(makeDeltaDirName(26, 27), stat[3].getPath().getName());
}
@Test
@@ -407,22 +416,22 @@ public void minorWithAborted() throws Exception {
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
- Assert.assertEquals(1, compacts.size());
- Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+ assertEquals(1, compacts.size());
+ assertEquals("ready for cleaning", compacts.get(0).getState());
// There should still now be 6 directories in the location
FileSystem fs = FileSystem.get(conf);
FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
- Assert.assertEquals(6, stat.length);
+ assertEquals(6, stat.length);
// Find the new delta file and make sure it has the right contents
Arrays.sort(stat);
- Assert.assertEquals("base_20", stat[0].getPath().getName());
- Assert.assertEquals(makeDeleteDeltaDirNameCompacted(21, 27) + "_v0000028",
stat[1].getPath().getName());
- Assert.assertEquals(makeDeltaDirName(21, 22), stat[2].getPath().getName());
- Assert.assertEquals(makeDeltaDirNameCompacted(21, 27) + "_v0000028",
stat[3].getPath().getName());
- Assert.assertEquals(makeDeltaDirName(23, 25), stat[4].getPath().getName());
- Assert.assertEquals(makeDeltaDirName(26, 27), stat[5].getPath().getName());
+ assertEquals("base_20", stat[0].getPath().getName());
+ assertEquals(makeDeleteDeltaDirNameCompacted(21, 27) + "_v0000028",
stat[1].getPath().getName());
+ assertEquals(makeDeltaDirName(21, 22), stat[2].getPath().getName());
+ assertEquals(makeDeltaDirNameCompacted(21, 27) + "_v0000028",
stat[3].getPath().getName());
+ assertEquals(makeDeltaDirName(23, 25), stat[4].getPath().getName());
+ assertEquals(makeDeltaDirName(26, 27), stat[5].getPath().getName());
}
@Test
@@ -444,13 +453,13 @@ public void minorPartitionWithBase() throws Exception {
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
- Assert.assertEquals(1, compacts.size());
- Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+ assertEquals(1, compacts.size());
+ assertEquals("ready for cleaning", compacts.get(0).getState());
// There should still be four directories in the location.
FileSystem fs = FileSystem.get(conf);
FileStatus[] stat = fs.listStatus(new Path(p.getSd().getLocation()));
- Assert.assertEquals(5, stat.length);
+ assertEquals(5, stat.length);
// Find the new delta file and make sure it has the right contents
boolean sawNewDelta = false;
@@ -458,25 +467,25 @@ public void minorPartitionWithBase() throws Exception {
if (stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(21, 24)
+ "_v0000026")) {
sawNewDelta = true;
FileStatus[] buckets = fs.listStatus(stat[i].getPath(),
FileUtils.HIDDEN_FILES_PATH_FILTER);
- Assert.assertEquals(2, buckets.length);
-
Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
-
Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
- Assert.assertEquals(104L, buckets[0].getLen());
- Assert.assertEquals(104L, buckets[1].getLen());
+ assertEquals(2, buckets.length);
+ assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+ assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+ assertEquals(104L, buckets[0].getLen());
+ assertEquals(104L, buckets[1].getLen());
}
if
(stat[i].getPath().getName().equals(makeDeleteDeltaDirNameCompacted(21, 24))) {
sawNewDelta = true;
FileStatus[] buckets = fs.listStatus(stat[i].getPath(),
FileUtils.HIDDEN_FILES_PATH_FILTER);
- Assert.assertEquals(2, buckets.length);
-
Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
-
Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
- Assert.assertEquals(104L, buckets[0].getLen());
- Assert.assertEquals(104L, buckets[1].getLen());
+ assertEquals(2, buckets.length);
+ assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+ assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+ assertEquals(104L, buckets[0].getLen());
+ assertEquals(104L, buckets[1].getLen());
} else {
LOG.debug("This is not the delta file you are looking for " +
stat[i].getPath().getName());
}
}
- Assert.assertTrue(toString(stat), sawNewDelta);
+ assertTrue(toString(stat), sawNewDelta);
}
@Test
@@ -496,13 +505,13 @@ public void minorTableNoBase() throws Exception {
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
- Assert.assertEquals(1, compacts.size());
- Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+ assertEquals(1, compacts.size());
+ assertEquals("ready for cleaning", compacts.get(0).getState());
// There should still now be 5 directories in the location
FileSystem fs = FileSystem.get(conf);
FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
- Assert.assertEquals(4, stat.length);
+ assertEquals(4, stat.length);
// Find the new delta file and make sure it has the right contents
boolean sawNewDelta = false;
@@ -510,25 +519,25 @@ public void minorTableNoBase() throws Exception {
if (stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(1, 4) +
"_v0000006")) {
sawNewDelta = true;
FileStatus[] buckets = fs.listStatus(stat[i].getPath(),
FileUtils.HIDDEN_FILES_PATH_FILTER);
- Assert.assertEquals(2, buckets.length);
-
Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
-
Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
- Assert.assertEquals(104L, buckets[0].getLen());
- Assert.assertEquals(104L, buckets[1].getLen());
+ assertEquals(2, buckets.length);
+ assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+ assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+ assertEquals(104L, buckets[0].getLen());
+ assertEquals(104L, buckets[1].getLen());
}
if
(stat[i].getPath().getName().equals(makeDeleteDeltaDirNameCompacted(1, 4) +
"_v0000006")) {
sawNewDelta = true;
FileStatus[] buckets = fs.listStatus(stat[i].getPath(),
FileUtils.HIDDEN_FILES_PATH_FILTER);
- Assert.assertEquals(2, buckets.length);
-
Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
-
Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
- Assert.assertEquals(104L, buckets[0].getLen());
- Assert.assertEquals(104L, buckets[1].getLen());
+ assertEquals(2, buckets.length);
+ assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+ assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+ assertEquals(104L, buckets[0].getLen());
+ assertEquals(104L, buckets[1].getLen());
} else {
LOG.debug("This is not the delta file you are looking for " +
stat[i].getPath().getName());
}
}
- Assert.assertTrue(toString(stat), sawNewDelta);
+ assertTrue(toString(stat), sawNewDelta);
}
@Test
@@ -549,13 +558,13 @@ public void majorTableWithBase() throws Exception {
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
- Assert.assertEquals(1, compacts.size());
- Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+ assertEquals(1, compacts.size());
+ assertEquals("ready for cleaning", compacts.get(0).getState());
// There should still now be 5 directories in the location
FileSystem fs = FileSystem.get(conf);
FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
- Assert.assertEquals(4, stat.length);
+ assertEquals(4, stat.length);
// Find the new delta file and make sure it has the right contents
boolean sawNewBase = false;
@@ -563,16 +572,16 @@ public void majorTableWithBase() throws Exception {
if (stat[i].getPath().getName().equals("base_0000024_v0000026")) {
sawNewBase = true;
FileStatus[] buckets = fs.listStatus(stat[i].getPath(),
FileUtils.HIDDEN_FILES_PATH_FILTER);
- Assert.assertEquals(2, buckets.length);
-
Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
-
Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
- Assert.assertEquals(624L, buckets[0].getLen());
- Assert.assertEquals(624L, buckets[1].getLen());
+ assertEquals(2, buckets.length);
+ assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+ assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+ assertEquals(624L, buckets[0].getLen());
+ assertEquals(624L, buckets[1].getLen());
} else {
LOG.debug("This is not the file you are looking for " +
stat[i].getPath().getName());
}
}
- Assert.assertTrue(toString(stat), sawNewBase);
+ assertTrue(toString(stat), sawNewBase);
}
@Test
@@ -625,14 +634,14 @@ private void compactNoBaseLotsOfDeltas(CompactionType
type) throws Exception {
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
- Assert.assertEquals(1, compacts.size());
- Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+ assertEquals(1, compacts.size());
+ assertEquals("ready for cleaning", compacts.get(0).getState());
FileSystem fs = FileSystem.get(conf);
FileStatus[] stat = fs.listStatus(new Path(p.getSd().getLocation()));
/* delete_delta_21_23 and delete_delta_25_33 which are created as a result
of compacting*/
int numFilesExpected = 11 + (type == CompactionType.MINOR ? 1 : 0);
- Assert.assertEquals(numFilesExpected, stat.length);
+ assertEquals(numFilesExpected, stat.length);
// Find the new delta file and make sure it has the right contents
List<String> matchesNotFound = new ArrayList<>(numFilesExpected);
@@ -665,7 +674,7 @@ private void compactNoBaseLotsOfDeltas(CompactionType type)
throws Exception {
if(matchesNotFound.size() == 0) {
return;
}
- Assert.assertTrue("Files remaining: " + matchesNotFound + "; " +
toString(stat), false);
+ assertTrue("Files remaining: " + matchesNotFound + "; " + toString(stat),
false);
}
@Test
public void majorPartitionWithBase() throws Exception {
@@ -687,13 +696,13 @@ public void majorPartitionWithBase() throws Exception {
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
- Assert.assertEquals(1, compacts.size());
- Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+ assertEquals(1, compacts.size());
+ assertEquals("ready for cleaning", compacts.get(0).getState());
// There should still be four directories in the location.
FileSystem fs = FileSystem.get(conf);
FileStatus[] stat = fs.listStatus(new Path(p.getSd().getLocation()));
- Assert.assertEquals(4, stat.length);
+ assertEquals(4, stat.length);
// Find the new delta file and make sure it has the right contents
boolean sawNewBase = false;
@@ -701,16 +710,16 @@ public void majorPartitionWithBase() throws Exception {
if (stat[i].getPath().getName().equals("base_0000024_v0000026")) {
sawNewBase = true;
FileStatus[] buckets = fs.listStatus(stat[i].getPath(),
FileUtils.HIDDEN_FILES_PATH_FILTER);
- Assert.assertEquals(2, buckets.length);
-
Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
-
Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
- Assert.assertEquals(624L, buckets[0].getLen());
- Assert.assertEquals(624L, buckets[1].getLen());
+ assertEquals(2, buckets.length);
+ assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+ assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+ assertEquals(624L, buckets[0].getLen());
+ assertEquals(624L, buckets[1].getLen());
} else {
LOG.debug("This is not the file you are looking for " +
stat[i].getPath().getName());
}
}
- Assert.assertTrue(toString(stat), sawNewBase);
+ assertTrue(toString(stat), sawNewBase);
}
@Test
@@ -730,13 +739,13 @@ public void majorTableNoBase() throws Exception {
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
- Assert.assertEquals(1, compacts.size());
- Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+ assertEquals(1, compacts.size());
+ assertEquals("ready for cleaning", compacts.get(0).getState());
// There should now be 3 directories in the location
FileSystem fs = FileSystem.get(conf);
FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
- Assert.assertEquals(3, stat.length);
+ assertEquals(3, stat.length);
// Find the new delta file and make sure it has the right contents
boolean sawNewBase = false;
@@ -744,16 +753,16 @@ public void majorTableNoBase() throws Exception {
if (stat[i].getPath().getName().equals("base_0000004_v0000005")) {
sawNewBase = true;
FileStatus[] buckets = fs.listStatus(stat[i].getPath(),
FileUtils.HIDDEN_FILES_PATH_FILTER);
- Assert.assertEquals(2, buckets.length);
-
Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
-
Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
- Assert.assertEquals(104L, buckets[0].getLen());
- Assert.assertEquals(104L, buckets[1].getLen());
+ assertEquals(2, buckets.length);
+ assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+ assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+ assertEquals(104L, buckets[0].getLen());
+ assertEquals(104L, buckets[1].getLen());
} else {
LOG.debug("This is not the file you are looking for " +
stat[i].getPath().getName());
}
}
- Assert.assertTrue(toString(stat), sawNewBase);
+ assertTrue(toString(stat), sawNewBase);
}
private static String toString(FileStatus[] stat) {
@@ -785,8 +794,8 @@ public void majorTableLegacy() throws Exception {
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
- Assert.assertEquals(1, compacts.size());
- Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+ assertEquals(1, compacts.size());
+ assertEquals("ready for cleaning", compacts.get(0).getState());
// There should still now be 5 directories in the location
FileSystem fs = FileSystem.get(conf);
@@ -799,16 +808,16 @@ public void majorTableLegacy() throws Exception {
if (stat[i].getPath().getName().equals("base_0000024_v0000026")) {
sawNewBase = true;
FileStatus[] buckets = fs.listStatus(stat[i].getPath(),
FileUtils.HIDDEN_FILES_PATH_FILTER);
- Assert.assertEquals(2, buckets.length);
-
Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
-
Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
- Assert.assertEquals(624L, buckets[0].getLen());
- Assert.assertEquals(624L, buckets[1].getLen());
+ assertEquals(2, buckets.length);
+ assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+ assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+ assertEquals(624L, buckets[0].getLen());
+ assertEquals(624L, buckets[1].getLen());
} else {
LOG.debug("This is not the file you are looking for " +
stat[i].getPath().getName());
}
}
- Assert.assertTrue(toString(stat), sawNewBase);
+ assertTrue(toString(stat), sawNewBase);
}
@Test
@@ -829,8 +838,8 @@ public void minorTableLegacy() throws Exception {
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
- Assert.assertEquals(1, compacts.size());
- Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+ assertEquals(1, compacts.size());
+ assertEquals("ready for cleaning", compacts.get(0).getState());
// There should still now be 5 directories in the location
FileSystem fs = FileSystem.get(conf);
@@ -842,14 +851,14 @@ public void minorTableLegacy() throws Exception {
if (stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(21, 24)
+ "_v0000026")) {
sawNewDelta = true;
FileStatus[] buckets = fs.listStatus(stat[i].getPath(),
FileUtils.HIDDEN_FILES_PATH_FILTER);
- Assert.assertEquals(2, buckets.length);
-
Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
-
Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+ assertEquals(2, buckets.length);
+ assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+ assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
} else {
LOG.debug("This is not the file you are looking for " +
stat[i].getPath().getName());
}
}
- Assert.assertTrue(toString(stat), sawNewDelta);
+ assertTrue(toString(stat), sawNewDelta);
}
@Test
@@ -873,13 +882,13 @@ public void majorPartitionWithBaseMissingBuckets() throws
Exception {
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
- Assert.assertEquals(1, compacts.size());
- Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+ assertEquals(1, compacts.size());
+ assertEquals("ready for cleaning", compacts.get(0).getState());
// There should still be four directories in the location.
FileSystem fs = FileSystem.get(conf);
FileStatus[] stat = fs.listStatus(new Path(p.getSd().getLocation()));
- Assert.assertEquals(4, stat.length);
+ assertEquals(4, stat.length);
// Find the new delta file and make sure it has the right contents
boolean sawNewBase = false;
@@ -887,11 +896,11 @@ public void majorPartitionWithBaseMissingBuckets() throws
Exception {
if (stat[i].getPath().getName().equals("base_0000026_v0000028")) {
sawNewBase = true;
FileStatus[] buckets = fs.listStatus(stat[i].getPath(),
FileUtils.HIDDEN_FILES_PATH_FILTER);
- Assert.assertEquals(2, buckets.length);
-
Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
-
Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+ assertEquals(2, buckets.length);
+ assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+ assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
// Bucket 0 should be small and bucket 1 should be large, make sure
that's the case
- Assert.assertTrue(
+ assertTrue(
("bucket_00000".equals(buckets[0].getPath().getName()) && 104L ==
buckets[0].getLen()
&& "bucket_00001".equals(buckets[1].getPath().getName()) && 676L
== buckets[1]
.getLen())
@@ -904,7 +913,7 @@ public void majorPartitionWithBaseMissingBuckets() throws
Exception {
LOG.debug("This is not the file you are looking for " +
stat[i].getPath().getName());
}
}
- Assert.assertTrue(toString(stat), sawNewBase);
+ assertTrue(toString(stat), sawNewBase);
}
@Test
@@ -926,21 +935,21 @@ public void majorWithOpenInMiddle() throws Exception {
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
- Assert.assertEquals(1, compacts.size());
- Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+ assertEquals(1, compacts.size());
+ assertEquals("ready for cleaning", compacts.get(0).getState());
// There should still now be 5 directories in the location
FileSystem fs = FileSystem.get(conf);
FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
- Assert.assertEquals(5, stat.length);
+ assertEquals(5, stat.length);
// Find the new delta file and make sure it has the right contents
Arrays.sort(stat);
- Assert.assertEquals("base_0000022_v0000028", stat[0].getPath().getName());
- Assert.assertEquals("base_20", stat[1].getPath().getName());
- Assert.assertEquals(makeDeltaDirName(21, 22), stat[2].getPath().getName());
- Assert.assertEquals(makeDeltaDirName(23, 25), stat[3].getPath().getName());
- Assert.assertEquals(makeDeltaDirName(26, 27), stat[4].getPath().getName());
+ assertEquals("base_0000022_v0000028", stat[0].getPath().getName());
+ assertEquals("base_20", stat[1].getPath().getName());
+ assertEquals(makeDeltaDirName(21, 22), stat[2].getPath().getName());
+ assertEquals(makeDeltaDirName(23, 25), stat[3].getPath().getName());
+ assertEquals(makeDeltaDirName(26, 27), stat[4].getPath().getName());
}
@Test
@@ -962,21 +971,21 @@ public void majorWithAborted() throws Exception {
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
- Assert.assertEquals(1, compacts.size());
- Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+ assertEquals(1, compacts.size());
+ assertEquals("ready for cleaning", compacts.get(0).getState());
// There should still now be 5 directories in the location
FileSystem fs = FileSystem.get(conf);
FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
- Assert.assertEquals(5, stat.length);
+ assertEquals(5, stat.length);
// Find the new delta file and make sure it has the right contents
Arrays.sort(stat);
- Assert.assertEquals("base_0000027_v0000028", stat[0].getPath().getName());
- Assert.assertEquals("base_20", stat[1].getPath().getName());
- Assert.assertEquals(makeDeltaDirName(21, 22), stat[2].getPath().getName());
- Assert.assertEquals(makeDeltaDirName(23, 25), stat[3].getPath().getName());
- Assert.assertEquals(makeDeltaDirName(26, 27), stat[4].getPath().getName());
+ assertEquals("base_0000027_v0000028", stat[0].getPath().getName());
+ assertEquals("base_20", stat[1].getPath().getName());
+ assertEquals(makeDeltaDirName(21, 22), stat[2].getPath().getName());
+ assertEquals(makeDeltaDirName(23, 25), stat[3].getPath().getName());
+ assertEquals(makeDeltaDirName(26, 27), stat[4].getPath().getName());
}
@Override
boolean useHive130DeltaDirName() {
@@ -1008,10 +1017,10 @@ public void testWorkerAndInitiatorVersion() throws
Exception {
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
- Assert.assertEquals(1, compacts.size());
- Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
- Assert.assertEquals(initiatorVersion,
compacts.get(0).getInitiatorVersion());
- Assert.assertEquals(workerVersion, compacts.get(0).getWorkerVersion());
+ assertEquals(1, compacts.size());
+ assertEquals("ready for cleaning", compacts.get(0).getState());
+ assertEquals(initiatorVersion, compacts.get(0).getInitiatorVersion());
+ assertEquals(workerVersion, compacts.get(0).getWorkerVersion());
}
@@ -1072,7 +1081,7 @@ public void droppedTable() throws Exception {
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
- Assert.assertEquals(0, compacts.size());
+ assertEquals(0, compacts.size());
}
@Test
@@ -1097,7 +1106,7 @@ public void droppedPartition() throws Exception {
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
- Assert.assertEquals(0, compacts.size());
+ assertEquals(0, compacts.size());
}
@Test
@@ -1148,8 +1157,8 @@ public void insertOnlyDisabled() throws Exception {
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
- Assert.assertEquals(1, compacts.size());
- Assert.assertEquals("failed", compacts.get(0).getState());
+ assertEquals(1, compacts.size());
+ assertEquals("failed", compacts.get(0).getState());
}
@@ -1162,21 +1171,21 @@ private void verifyTxn1IsAborted(int compactionNum,
Table t, CompactionType type
// Compaction should not have run on a single delta file
FileSystem fs = FileSystem.get(conf);
FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
- Assert.assertEquals(1, stat.length);
- Assert.assertEquals(makeDeltaDirName(0, 2), stat[0].getPath().getName());
+ assertEquals(1, stat.length);
+ assertEquals(makeDeltaDirName(0, 2), stat[0].getPath().getName());
// State should not be "ready for cleaning" because we skip cleaning
List<ShowCompactResponseElement> compacts =
txnHandler.showCompact(new ShowCompactRequest()).getCompacts();
- Assert.assertEquals(compactionNum + 1, compacts.size());
- Assert.assertEquals(TxnStore.REFUSED_RESPONSE,
compacts.get(compactionNum).getState());
+ assertEquals(compactionNum + 1, compacts.size());
+ assertEquals(TxnStore.REFUSED_RESPONSE,
compacts.get(compactionNum).getState());
// assert transaction with txnId=1 is still aborted after cleaner is run
startCleaner();
List<TxnInfo> openTxns =
HiveMetaStoreUtils.getHiveMetastoreClient(conf).showTxns().getOpen_txns();
- Assert.assertEquals(1, openTxns.get(0).getId());
- Assert.assertEquals(TxnState.ABORTED, openTxns.get(0).getState());
+ assertEquals(1, openTxns.get(0).getId());
+ assertEquals(TxnState.ABORTED, openTxns.get(0).getState());
}
// With high timeout, but fast run we should finish without a problem
@@ -1197,6 +1206,186 @@ public void testTimeoutWithoutInterrupt() throws
Exception {
runTimeoutTest(1, true, true);
}
+ @Test
+ public void testExceptionWhenTxnCommitAndMarkFailed() throws Exception {
+ prepareTableAndCompaction("default", "tbforcomperror");
+ runWorkerWithException(MethodToFail.COMMIT_TXN, MethodToFail.MARK_FAILED);
+
+ List<ShowCompactResponseElement> compacts =
+ txnHandler.showCompact(new ShowCompactRequest()).getCompacts();
+ assertEquals(TxnStore.WORKING_RESPONSE, compacts.get(0).getState());
+ List<TxnInfo> openTxns =
HiveMetaStoreUtils.getHiveMetastoreClient(conf).showTxns().getOpen_txns();
+ assertEquals(1, openTxns.size());
+ TxnInfo txn = openTxns.get(0);
+ assertEquals(compacts.get(0).getTxnId(), txn.getId());
+ assertEquals(TxnState.OPEN, txn.getState());
+ txnHandler.abortTxn(new AbortTxnRequest(txn.getId()));
+ }
+
+ @Test
+ public void testExceptionWhenTxnCommit() throws Exception {
+ prepareTableAndCompaction("default", "tbforcomperror");
+ runWorkerWithException(MethodToFail.COMMIT_TXN);
+
+ List<ShowCompactResponseElement> compacts = txnHandler.showCompact(new
ShowCompactRequest()).getCompacts();
+ ShowCompactResponseElement compaction = compacts.get(0);
+ assertEquals(TxnStore.FAILED_RESPONSE, compaction.getState());
+ assertEquals("Simulated failure in commitTxn",
compaction.getErrorMessage());
+ List<TxnInfo> openTxns =
HiveMetaStoreUtils.getHiveMetastoreClient(conf).showTxns().getOpen_txns();
+ assertEquals(1, openTxns.size());
+ TxnInfo txn = openTxns.get(0);
+ assertEquals(compaction.getTxnId(), txn.getId());
+ assertEquals(TxnState.OPEN, txn.getState());
+ txnHandler.abortTxn(new AbortTxnRequest(txn.getId()));
+ }
+
+ @Test
+ public void testExceptionWhenMarkCompacted() throws Exception {
+ prepareTableAndCompaction("default", "tbforcomperror");
+ runWorkerWithException(MethodToFail.MARK_COMPACTED);
+
+ List<ShowCompactResponseElement> compacts = txnHandler.showCompact(new
ShowCompactRequest()).getCompacts();
+ ShowCompactResponseElement compaction = compacts.get(0);
+ assertEquals(TxnStore.FAILED_RESPONSE, compaction.getState());
+ assertEquals("Simulated failure in markCompacted",
compaction.getErrorMessage());
+ List<TxnInfo> openTxns =
HiveMetaStoreUtils.getHiveMetastoreClient(conf).showTxns().getOpen_txns();
+ assertEquals(0, openTxns.size());
+ }
+
+ @Test
+ public void testExceptionDuringCompact() throws Exception {
+ prepareTableAndCompaction("default", "tbforcomperror");
+ HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEST, true);
+ HiveConf.setBoolVar(conf,
HiveConf.ConfVars.HIVE_TEST_MODE_FAIL_COMPACTION, true);
+ startWorker();
+
+ List<ShowCompactResponseElement> compacts = txnHandler.showCompact(new
ShowCompactRequest()).getCompacts();
+ ShowCompactResponseElement compaction = compacts.get(0);
+ assertEquals(TxnStore.FAILED_RESPONSE, compaction.getState());
+ assertEquals("HIVE_TEST_MODE_FAIL_COMPACTION=true",
compaction.getErrorMessage());
+ List<TxnInfo> openTxns =
HiveMetaStoreUtils.getHiveMetastoreClient(conf).showTxns().getOpen_txns();
+ assertEquals(1, openTxns.size());
+ TxnInfo txn = openTxns.get(0);
+ assertEquals(compaction.getTxnId(), txn.getId());
+ assertEquals(TxnState.ABORTED, txn.getState());
+ }
+
+ @Test
+ public void testWorkerIfIsDynPartAbort() throws Exception {
+ String dbName = "default";
+ String tableName = "tbforcomperror";
+ Table t = newTable(dbName, tableName, true);
+ addBaseFile(t, null, 1L, 3, 1);
+ addDeltaFile(t, null, 2L, 2L, 1);
+ addDeltaFile(t, null, 3L, 3L, 1);
+ addDeltaFile(t, null, 4L, 4L, 1);
+ burnThroughTransactions(dbName, tableName, 4, null, null);
+ // trigger compaction
+ CompactionRequest rqst = new CompactionRequest(dbName, tableName,
CompactionType.MAJOR);
+ rqst.setPartitionname(null);
+ txnHandler.compact(rqst);
+ startWorker();
+
+ List<ShowCompactResponseElement> compacts = txnHandler.showCompact(new
ShowCompactRequest()).getCompacts();
+ ShowCompactResponseElement compaction = compacts.get(0);
+ assertEquals(TxnStore.CLEANING_RESPONSE, compaction.getState());
+ assertTrue(compaction.getNextTxnId() > 0L);
+ List<TxnInfo> openTxns =
HiveMetaStoreUtils.getHiveMetastoreClient(conf).showTxns().getOpen_txns();
+ assertEquals(0, openTxns.size());
+ }
+
+ @Test
+ public void testWorkerNotEnoughToCompact() throws Exception {
+ String dbName = "default";
+ String tableName = "tbforcomperror";
+ Table t = newTable(dbName, tableName, false);
+ addBaseFile(t, null, 1L, 3, 1);
+ burnThroughTransactions(dbName, tableName, 1, null, null);
+ // trigger compaction
+ CompactionRequest rqst = new CompactionRequest(dbName, tableName,
CompactionType.MAJOR);
+ txnHandler.compact(rqst);
+ startWorker();
+
+ List<ShowCompactResponseElement> compacts = txnHandler.showCompact(new
ShowCompactRequest()).getCompacts();
+ ShowCompactResponseElement compaction = compacts.get(0);
+ assertEquals(TxnStore.REFUSED_RESPONSE, compaction.getState());
+ assertTrue(compaction.getErrorMessage().contains("None of the compaction
thresholds met, compaction request is refused!"));
+ List<TxnInfo> openTxns =
HiveMetaStoreUtils.getHiveMetastoreClient(conf).showTxns().getOpen_txns();
+ assertEquals(0, openTxns.size());
+ }
+
+ @Test
+ public void testWorkerNotEnoughToCompactNeedsCleaning() throws Exception {
+ String dbName = "default";
+ String tableName = "tbforcomperror";
+ Table t = newTable(dbName, tableName, false);
+ addDeltaFile(t, null, 20L, 20L, 10);
+ addDeltaFile(t, null, 21L, 21L, 10);
+ addDeltaFile(t, null, 22L, 22L, 10);
+ addDeltaFile(t, null, 23L, 23L, 10);
+ addDeltaFile(t, null, 24L, 24L, 10);
+ burnThroughTransactions(dbName, tableName, 25, null, new
HashSet<Long>(Arrays.asList(20L, 21L, 22L, 23L, 24L)));
+ // trigger compaction
+ CompactionRequest rqst = new CompactionRequest(dbName, tableName,
CompactionType.MAJOR);
+ txnHandler.compact(rqst);
+ startWorker();
+
+ List<ShowCompactResponseElement> compacts = txnHandler.showCompact(new
ShowCompactRequest()).getCompacts();
+ ShowCompactResponseElement compaction = compacts.get(0);
+ assertEquals(TxnStore.CLEANING_RESPONSE, compaction.getState());
+ assertTrue(compaction.getNextTxnId() > 0L);
+ List<TxnInfo> openTxns =
HiveMetaStoreUtils.getHiveMetastoreClient(conf).showTxns().getOpen_txns();
+ assertEquals(5, openTxns.size());
+ assertEquals(20L, openTxns.get(0).getId());
+ assertEquals(21L, openTxns.get(1).getId());
+ assertEquals(22L, openTxns.get(2).getId());
+ assertEquals(23L, openTxns.get(3).getId());
+ assertEquals(24L, openTxns.get(4).getId());
+
+ }
+
+ private void runWorkerWithException(MethodToFail... methodToFail) throws
Exception {
+ IMetaStoreClient spyMsc = Mockito.spy(ms);
+ for (MethodToFail method: methodToFail) {
+ switch (method) {
+ case MARK_FAILED -> doThrow(new TTransportException("Simulated failure
in markFailed")).when(spyMsc).markFailed(any());
+ case COMMIT_TXN -> doThrow(new TException("Simulated failure in
commitTxn")).when(spyMsc).commitTxn(anyLong());
+ case MARK_COMPACTED -> doThrow(new TTransportException("Simulated
failure in markCompacted")).when(spyMsc).markCompacted(any());
+ }
+ }
+
+ TestTxnDbUtil.setConfValues(conf);
+ Worker worker = Mockito.spy(new Worker());
+ worker.setConf(conf);
+ AtomicBoolean stop = new AtomicBoolean();
+ stop.set(true);
+ worker.init(stop);
+ worker.msc = spyMsc;
+ worker.setName("testworker");
+ CompactorThread ct = worker;
+ ct.run();
+ }
+
+ private void prepareTableAndCompaction(String dbName, String tableName)
throws Exception {
+ Table t = newTable(dbName, tableName, false);
+ addBaseFile(t, null, 20L, 20);
+ addDeltaFile(t, null, 21L, 23L, 2);
+ addDeltaFile(t, null, 23L, 23L, 3);
+ addDeltaFile(t, null, 24L, 24L, 2);
+ addDeltaFile(t, null, 25L, 25L, 3);
+ addDeltaFile(t, null, 26L, 26L, 3);
+ burnThroughTransactions(dbName, tableName, 27, null, null);
+ // trigger compaction
+ CompactionRequest rqst = new CompactionRequest(dbName, tableName,
CompactionType.MAJOR);
+ txnHandler.compact(rqst);
+ }
+
+ enum MethodToFail {
+ MARK_COMPACTED,
+ MARK_FAILED,
+ COMMIT_TXN;
+ }
+
private void runTimeoutTest(long timeout, boolean runForever, boolean
swallowInterrupt) throws Exception {
ExecutorService executor = Executors.newSingleThreadExecutor();
HiveConf timeoutConf = new HiveConf(conf);