This is an automated email from the ASF dual-hosted git repository.
domgarguilo pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push:
new 8a69501490 Add more conditions to AccumuloStore (#4242)
8a69501490 is described below
commit 8a695014908b3e78197925527be4be37ec01e126
Author: Dom G <[email protected]>
AuthorDate: Wed Feb 21 10:39:10 2024 -0500
Add more conditions to AccumuloStore (#4242)
* Add require status condition to AccumuloStore.pop(), delete() and push()
* Add tests to make sure statuses are checked
---
.../accumulo/core/fate/accumulo/AccumuloStore.java | 10 +-
.../core/fate/accumulo/StatusMappingIterator.java | 2 +
.../test/fate/accumulo/AccumuloStoreIT.java | 108 ++++++++++++++++++++-
.../accumulo/test/fate/accumulo/FateStoreIT.java | 5 +
4 files changed, 121 insertions(+), 4 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java
b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java
index 7fd4b967cb..8ae222c61c 100644
---
a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java
+++
b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java
@@ -351,7 +351,8 @@ public class AccumuloStore<T> extends AbstractFateStore<T> {
throw new StackOverflowException("Repo stack size too large");
}
- FateMutator<T> fateMutator = newMutator(fateId);
+ FateMutator<T> fateMutator =
+ newMutator(fateId).requireStatus(TStatus.IN_PROGRESS, TStatus.NEW);
fateMutator.putRepo(top.map(t -> t + 1).orElse(1), repo).mutate();
}
@@ -360,7 +361,8 @@ public class AccumuloStore<T> extends AbstractFateStore<T> {
verifyReserved(true);
Optional<Integer> top = findTop();
- top.ifPresent(t -> newMutator(fateId).deleteRepo(t).mutate());
+ top.ifPresent(
+ t ->
newMutator(fateId).requireStatus(TStatus.FAILED_IN_PROGRESS).deleteRepo(t).mutate());
}
@Override
@@ -384,7 +386,9 @@ public class AccumuloStore<T> extends AbstractFateStore<T> {
public void delete() {
verifyReserved(true);
- newMutator(fateId).delete().mutate();
+ var mutator = newMutator(fateId);
+ mutator.requireStatus(TStatus.NEW, TStatus.SUBMITTED,
TStatus.SUCCESSFUL, TStatus.FAILED);
+ mutator.delete().mutate();
}
private Optional<Integer> findTop() {
diff --git
a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/StatusMappingIterator.java
b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/StatusMappingIterator.java
index d7dc4fa22c..073f879318 100644
---
a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/StatusMappingIterator.java
+++
b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/StatusMappingIterator.java
@@ -65,6 +65,8 @@ public class StatusMappingIterator implements
SortedKeyValueIterator<Key,Value>
if (options.containsKey(STATUS_SET_KEY)) {
String[] statuses = decodeStatuses(options.get(STATUS_SET_KEY));
acceptableStatuses.addAll(Arrays.asList(statuses));
+ } else {
+ throw new IllegalArgumentException("Expected option " + STATUS_SET_KEY +
" to be set.");
}
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloStoreIT.java
b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloStoreIT.java
index af9280f850..a501526cba 100644
---
a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloStoreIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloStoreIT.java
@@ -18,6 +18,7 @@
*/
package org.apache.accumulo.test.fate.accumulo;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -28,14 +29,28 @@ import java.util.TreeSet;
import java.util.stream.Collectors;
import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.FateInstanceType;
+import org.apache.accumulo.core.fate.FateStore;
+import org.apache.accumulo.core.fate.ReadOnlyFateStore;
import org.apache.accumulo.core.fate.accumulo.AccumuloStore;
+import org.apache.accumulo.core.fate.accumulo.schema.FateSchema;
import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.test.fate.FateIT;
+import org.apache.hadoop.io.Text;
import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.function.Executable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,7 +69,7 @@ public class AccumuloStoreIT extends SharedMiniClusterBase {
SharedMiniClusterBase.stopMiniCluster();
}
- private static class TestAccumuloStore extends AccumuloStore<String> {
+ private static class TestAccumuloStore extends AccumuloStore<FateIT.TestEnv>
{
private final Iterator<FateId> fateIdIterator;
// use the list of fateIds to simulate collisions on fateIds
@@ -71,6 +86,10 @@ public class AccumuloStoreIT extends SharedMiniClusterBase {
return FateId.from(fateInstanceType, -1L);
}
}
+
+ public TStatus getStatus(FateId fateId) {
+ return _getStatus(fateId);
+ }
}
@Test
@@ -97,4 +116,91 @@ public class AccumuloStoreIT extends SharedMiniClusterBase {
assertThrows(IllegalStateException.class, store::create);
}
}
+
+ @Nested
+ class TestStatusEnforcement {
+
+ String tableName;
+ ClientContext client;
+ FateId fateId;
+ TestAccumuloStore store;
+ FateStore.FateTxStore<FateIT.TestEnv> txStore;
+
+ @BeforeEach
+ public void setup() throws Exception {
+ client = (ClientContext)
Accumulo.newClient().from(getClientProps()).build();
+ tableName = getUniqueNames(1)[0];
+ client.tableOperations().create(tableName);
+ fateId = FateId.from(fateInstanceType, 1L);
+ store = new TestAccumuloStore(client, tableName, List.of(fateId));
+ store.create();
+ txStore = store.reserve(fateId);
+ }
+
+ @AfterEach
+ public void teardown() throws Exception {
+ client.close();
+ }
+
+ private void testOperationWithStatuses(Runnable beforeOperation,
Executable operation,
+ Set<ReadOnlyFateStore.TStatus> acceptableStatuses) throws Exception {
+ for (ReadOnlyFateStore.TStatus status :
ReadOnlyFateStore.TStatus.values()) {
+ // Run any needed setup for the operation before each iteration
+ beforeOperation.run();
+
+ injectStatus(client, tableName, fateId, status);
+ assertEquals(status, store.getStatus(fateId));
+ if (!acceptableStatuses.contains(status)) {
+ assertThrows(IllegalStateException.class, operation,
+ "Expected operation to fail with status " + status + " but it
did not");
+ } else {
+ assertDoesNotThrow(operation,
+ "Expected operation to succeed with status " + status + " but it
did not");
+ }
+ }
+ }
+
+ @Test
+ public void push() throws Exception {
+ testOperationWithStatuses(() -> {}, // No special setup needed for push
+ () -> txStore.push(new FateIT.TestRepo("testOp")),
+ Set.of(ReadOnlyFateStore.TStatus.IN_PROGRESS,
ReadOnlyFateStore.TStatus.NEW));
+ }
+
+ @Test
+ public void pop() throws Exception {
+ testOperationWithStatuses(() -> {
+ // Setup for pop: Ensure there something to pop by first pushing
+ try {
+ injectStatus(client, tableName, fateId,
ReadOnlyFateStore.TStatus.NEW);
+ txStore.push(new FateIT.TestRepo("testOp"));
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to setup for pop", e);
+ }
+ }, txStore::pop, Set.of(ReadOnlyFateStore.TStatus.FAILED_IN_PROGRESS));
+ }
+
+ @Test
+ public void delete() throws Exception {
+ testOperationWithStatuses(() -> {}, // No special setup needed for delete
+ txStore::delete,
+ Set.of(ReadOnlyFateStore.TStatus.NEW,
ReadOnlyFateStore.TStatus.SUBMITTED,
+ ReadOnlyFateStore.TStatus.SUCCESSFUL,
ReadOnlyFateStore.TStatus.FAILED));
+ }
+ }
+
+ /**
+ * Inject a status into the status col of the fate store table for a given
transaction id.
+ */
+ private void injectStatus(ClientContext client, String table, FateId fateId,
+ ReadOnlyFateStore.TStatus status) throws TableNotFoundException {
+ try (BatchWriter writer = client.createBatchWriter(table)) {
+ Mutation mutation = new Mutation(new Text("tx_" + fateId.getHexTid()));
+ FateSchema.TxColumnFamily.STATUS_COLUMN.put(mutation, new
Value(status.name()));
+ writer.addMutation(mutation);
+ } catch (MutationsRejectedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/FateStoreIT.java
b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/FateStoreIT.java
index 181a21b9c6..deda053717 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/FateStoreIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/FateStoreIT.java
@@ -99,6 +99,7 @@ public abstract class FateStoreIT extends
SharedMiniClusterBase implements FateT
assertEquals(1, store.list().count());
// Push a test FATE op and verify we can read it back
+ txStore.setStatus(TStatus.IN_PROGRESS);
txStore.push(new TestRepo("testOp"));
TestRepo op = (TestRepo) txStore.top();
assertNotNull(op);
@@ -114,6 +115,7 @@ public abstract class FateStoreIT extends
SharedMiniClusterBase implements FateT
// Try setting a second test op to test getStack()
// when listing or popping TestOperation2 should be first
assertEquals(1, txStore.getStack().size());
+ txStore.setStatus(TStatus.IN_PROGRESS);
txStore.push(new TestOperation2());
// test top returns TestOperation2
ReadOnlyRepo<TestEnv> top = txStore.top();
@@ -126,6 +128,7 @@ public abstract class FateStoreIT extends
SharedMiniClusterBase implements FateT
assertEquals(TestRepo.class, ops.get(1).getClass());
// test pop, TestOperation should be left
+ txStore.setStatus(TStatus.FAILED_IN_PROGRESS); // needed to satisfy the
condition on pop
txStore.pop();
ops = txStore.getStack();
assertEquals(1, ops.size());
@@ -136,8 +139,10 @@ public abstract class FateStoreIT extends
SharedMiniClusterBase implements FateT
assertEquals(2, store.list().count());
// test delete
+ txStore.setStatus(TStatus.SUCCESSFUL); // needed to satisfy the condition
on delete
txStore.delete();
assertEquals(1, store.list().count());
+ txStore2.setStatus(TStatus.SUCCESSFUL); // needed to satisfy the condition
on delete
txStore2.delete();
assertEquals(0, store.list().count());
}