This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 00bac7ae56 FateIT Repo Error Test (#4282)
00bac7ae56 is described below
commit 00bac7ae5633147502ce57b28532942323f05030
Author: Kevin Rathbun <[email protected]>
AuthorDate: Wed Feb 21 16:03:08 2024 -0500
FateIT Repo Error Test (#4282)
---
.../accumulo/test/fate/zookeeper/FateIT.java | 141 +++++++++++++++++++--
1 file changed, 133 insertions(+), 8 deletions(-)
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java
b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java
index 5e153b21b1..2dde8fabca 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java
@@ -36,6 +36,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
@@ -115,6 +117,59 @@ public class FateIT {
}
+ public static class TestOperationFails extends ManagerRepo {
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOG =
LoggerFactory.getLogger(TestOperationFails.class);
+ private static List<String> undoOrder = new ArrayList<>();
+ private static final int TOTAL_NUM_OPS = 3;
+ private int opNum;
+ private final String opName;
+ private final ExceptionLocation location;
+
+ public TestOperationFails(int opNum, ExceptionLocation location) {
+ this.opNum = opNum;
+ this.opName = "OP" + opNum;
+ this.location = location;
+ }
+
+ @Override
+ public long isReady(long tid, Manager environment) throws Exception {
+ LOG.debug("{} {} Entered isReady()", opName, FateTxId.formatTid(tid));
+ if (location == ExceptionLocation.IS_READY) {
+ if (opNum < TOTAL_NUM_OPS) {
+ return 0;
+ } else {
+ throw new Exception(
+ opName + " " + FateTxId.formatTid(tid) + " isReady() failed -
this is expected");
+ }
+ } else {
+ return 0;
+ }
+ }
+
+ @Override
+ public void undo(long tid, Manager environment) throws Exception {
+ LOG.debug("{} {} Entered undo()", opName, FateTxId.formatTid(tid));
+ undoOrder.add(opName);
+ undoLatch.countDown();
+ }
+
+ @Override
+ public Repo<Manager> call(long tid, Manager environment) throws Exception {
+ LOG.debug("{} {} Entered call()", opName, FateTxId.formatTid(tid));
+ if (location == ExceptionLocation.CALL) {
+ if (opNum < TOTAL_NUM_OPS) {
+ return new TestOperationFails(++opNum, location);
+ } else {
+ throw new Exception(
+ opName + " " + FateTxId.formatTid(tid) + " call() failed - this
is expected");
+ }
+ } else {
+ return new TestOperationFails(++opNum, location);
+ }
+ }
+ }
+
private static final Logger LOG = LoggerFactory.getLogger(FateIT.class);
@TempDir
@@ -128,6 +183,11 @@ public class FateIT {
private static CountDownLatch callStarted;
private static CountDownLatch finishCall;
+ private static CountDownLatch undoLatch;
+
+ private enum ExceptionLocation {
+ CALL, IS_READY
+ };
@BeforeAll
public static void setup() throws Exception {
@@ -165,10 +225,6 @@ public class FateIT {
ConfigurationCopy config = new ConfigurationCopy();
config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
- fate.startTransactionRunners(config);
-
- // Wait for the transaction runner to be scheduled.
- UtilWaitThread.sleep(3000);
callStarted = new CountDownLatch(1);
finishCall = new CountDownLatch(1);
@@ -177,6 +233,11 @@ public class FateIT {
assertEquals(TStatus.NEW, getTxStatus(zk, txid));
fate.seedTransaction("TestOperation", txid, new TestOperation(NS, TID),
true, "Test Op");
assertEquals(TStatus.SUBMITTED, getTxStatus(zk, txid));
+
+ fate.startTransactionRunners(config);
+ // Wait for the transaction runner to be scheduled.
+ UtilWaitThread.sleep(3000);
+
// wait for call() to be called
callStarted.await();
assertEquals(IN_PROGRESS, getTxStatus(zk, txid));
@@ -346,10 +407,6 @@ public class FateIT {
ConfigurationCopy config = new ConfigurationCopy();
config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
- fate.startTransactionRunners(config);
-
- // Wait for the transaction runner to be scheduled.
- UtilWaitThread.sleep(3000);
callStarted = new CountDownLatch(1);
finishCall = new CountDownLatch(1);
@@ -359,6 +416,11 @@ public class FateIT {
assertEquals(NEW, getTxStatus(zk, txid));
fate.seedTransaction("TestOperation", txid, new TestOperation(NS, TID),
true, "Test Op");
assertEquals(SUBMITTED, getTxStatus(zk, txid));
+
+ fate.startTransactionRunners(config);
+ // Wait for the transaction runner to be scheduled.
+ UtilWaitThread.sleep(3000);
+
// wait for call() to be called
callStarted.await();
// cancel the transaction
@@ -369,6 +431,69 @@ public class FateIT {
}
+ @Test
+ public void testRepoFails() throws Exception {
+ /*
+ * This test ensures that when an exception occurs in a Repo's call() or
isReady() methods, that
+ * undo() will be called back up the chain of Repo's and in the correct
order. The test works as
+ * follows: 1) Repo1 is called and returns Repo2, 2) Repo2 is called and
returns Repo3, 3) Repo3
+ * is called and throws an exception (in call() or isReady()). It is then
expected that: 1)
+ * undo() is called on Repo3, 2) undo() is called on Repo2, 3) undo() is
called on Repo1
+ */
+ final ZooStore<Manager> zooStore = new ZooStore<Manager>(ZK_ROOT +
Constants.ZFATE, zk);
+ final AgeOffStore<Manager> store =
+ new AgeOffStore<Manager>(zooStore, 3000, System::currentTimeMillis);
+
+ Manager manager = createMock(Manager.class);
+ ServerContext sctx = createMock(ServerContext.class);
+ expect(manager.getContext()).andReturn(sctx).anyTimes();
+ expect(sctx.getZooKeeperRoot()).andReturn(ZK_ROOT).anyTimes();
+ expect(sctx.getZooReaderWriter()).andReturn(zk).anyTimes();
+ replay(manager, sctx);
+
+ Fate<Manager> fate = new Fate<Manager>(manager, store,
TraceRepo::toLogString);
+ try {
+ ConfigurationCopy config = new ConfigurationCopy();
+ config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
+ config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
+ fate.startTransactionRunners(config);
+
+ // Wait for the transaction runner to be scheduled.
+ UtilWaitThread.sleep(3000);
+
+ List<String> expectedUndoOrder = List.of("OP3", "OP2", "OP1");
+ /*
+ * Test exception in call()
+ */
+ undoLatch = new CountDownLatch(TestOperationFails.TOTAL_NUM_OPS);
+ long txid = fate.startTransaction();
+ assertEquals(NEW, getTxStatus(zk, txid));
+ fate.seedTransaction("TestOperationFails", txid,
+ new TestOperationFails(1, ExceptionLocation.CALL), false, "Test Op
Fails");
+ // Wait for all the undo() calls to complete
+ undoLatch.await();
+ assertEquals(expectedUndoOrder, TestOperationFails.undoOrder);
+ assertEquals(FAILED, fate.waitForCompletion(txid));
+ assertTrue(fate.getException(txid).getMessage().contains("call()
failed"));
+ /*
+ * Test exception in isReady()
+ */
+ TestOperationFails.undoOrder = new ArrayList<>();
+ undoLatch = new CountDownLatch(TestOperationFails.TOTAL_NUM_OPS);
+ txid = fate.startTransaction();
+ assertEquals(NEW, getTxStatus(zk, txid));
+ fate.seedTransaction("TestOperationFails", txid,
+ new TestOperationFails(1, ExceptionLocation.IS_READY), false, "Test
Op Fails");
+ // Wait for all the undo() calls to complete
+ undoLatch.await();
+ assertEquals(expectedUndoOrder, TestOperationFails.undoOrder);
+ assertEquals(FAILED, fate.waitForCompletion(txid));
+ assertTrue(fate.getException(txid).getMessage().contains("isReady()
failed"));
+ } finally {
+ fate.shutdown();
+ }
+ }
+
private static void inCall() throws InterruptedException {
// signal that call started
callStarted.countDown();