This is an automated email from the ASF dual-hosted git repository.
krathbun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new f9d8afebba Several misc Fate changes (#4912)
f9d8afebba is described below
commit f9d8afebbad00d2db476e23b8bedff839441b2c1
Author: Kevin Rathbun <[email protected]>
AuthorDate: Thu Oct 3 10:04:41 2024 -0400
Several misc Fate changes (#4912)
- Add a toString() to FateKey
- Move MetaFateStore to org.apache.accumulo.core.fate.zookeeper
- Periodic clean up of dead reservations increased from every 30 seconds to
every few minutes
- New fate test case added to FateIT that ensures no write ops can be
performed on a transaction after it has been deleted
- Added new check to verifyReserved() that checks whether the
transaction is deleted
- Fixed UserFateStoreIT to work with new change and misc cleanup to
the class
- created new class FastFate which performs the dead reservation cleanup
more often (used in testing)
---
.../accumulo/core/fate/AbstractFateStore.java | 14 +++----
.../java/org/apache/accumulo/core/fate/Fate.java | 8 +++-
.../org/apache/accumulo/core/fate/FateKey.java | 11 +++++
.../accumulo/core/fate/user/UserFateStore.java | 18 ++++-----
.../core/fate/{ => zookeeper}/MetaFateStore.java | 30 ++++++++------
.../org/apache/accumulo/server/util/Admin.java | 2 +-
.../java/org/apache/accumulo/manager/Manager.java | 2 +-
.../manager/metrics/fate/meta/MetaFateMetrics.java | 2 +-
.../test/compaction/ExternalCompaction_1_IT.java | 2 +-
.../org/apache/accumulo/test/fate/FastFate.java | 43 ++++++++++++++++++++
.../java/org/apache/accumulo/test/fate/FateIT.java | 32 +++++++++++++++
.../accumulo/test/fate/FateOpsCommandsIT.java | 2 +-
.../accumulo/test/fate/MultipleStoresIT.java | 24 +++++------
.../apache/accumulo/test/fate/meta/MetaFateIT.java | 2 +-
.../test/fate/meta/MetaFateInterleavingIT.java | 2 +-
.../test/fate/meta/MetaFateOpsCommandsIT.java | 2 +-
.../test/fate/meta/MetaFateStoreFateIT.java | 2 +-
.../accumulo/test/fate/user/UserFateStoreIT.java | 47 +++++++---------------
.../test/functional/FateConcurrencyIT.java | 2 +-
.../test/functional/FunctionalTestUtils.java | 2 +-
20 files changed, 162 insertions(+), 87 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java
b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java
index 97b391e218..ff5e45d310 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java
@@ -310,7 +310,7 @@ public abstract class AbstractFateStore<T> implements
FateStore<T> {
public TStatus waitForStatusChange(EnumSet<TStatus> expected) {
Preconditions.checkState(!isReserved(),
"Attempted to wait for status change while reserved: " + fateId);
- verifyReserved(false);
+ verifyReservedAndNotDeleted(false);
int currNumCallers = concurrentStatusChangeCallers.incrementAndGet();
@@ -375,16 +375,14 @@ public abstract class AbstractFateStore<T> implements
FateStore<T> {
protected abstract void unreserve();
- protected void verifyReserved(boolean isWrite) {
- if (!isReserved() && isWrite) {
- throw new IllegalStateException(
- "Attempted write on unreserved FATE transaction: " + fateId);
- }
+ protected void verifyReservedAndNotDeleted(boolean isWrite) {
+ Preconditions.checkState(!isWrite || (isReserved() && !deleted),
+ "Attempted write on unreserved or deleted FATE transaction: " +
fateId);
}
@Override
public TStatus getStatus() {
- verifyReserved(false);
+ verifyReservedAndNotDeleted(false);
var status = _getStatus(fateId);
observedStatus = status;
return status;
@@ -392,7 +390,7 @@ public abstract class AbstractFateStore<T> implements
FateStore<T> {
@Override
public Optional<FateKey> getKey() {
- verifyReserved(false);
+ verifyReservedAndNotDeleted(false);
return AbstractFateStore.this.getKey(fateId);
}
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
index e2d4e7cbe5..b6860c557d 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
@@ -383,8 +383,8 @@ public class Fate<T> {
// reservations held by dead processes, if they exist.
deadResCleanerExecutor =
ThreadPools.getServerThreadPools().createScheduledExecutorService(1,
store.type() + "-dead-reservation-cleaner-pool");
- ScheduledFuture<?> deadReservationCleaner = deadResCleanerExecutor
- .scheduleWithFixedDelay(new DeadReservationCleaner(), 3, 30,
SECONDS);
+ ScheduledFuture<?> deadReservationCleaner =
deadResCleanerExecutor.scheduleWithFixedDelay(
+ new DeadReservationCleaner(), 3,
getDeadResCleanupDelay().toSeconds(), SECONDS);
ThreadPools.watchCriticalScheduledTask(deadReservationCleaner);
}
this.deadResCleanerExecutor = deadResCleanerExecutor;
@@ -393,6 +393,10 @@ public class Fate<T> {
this.workFinder.start();
}
+ public Duration getDeadResCleanupDelay() {
+ return Duration.ofMinutes(3);
+ }
+
// get a transaction id back to the requester before doing any work
public FateId startTransaction() {
return store.create();
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateKey.java
b/core/src/main/java/org/apache/accumulo/core/fate/FateKey.java
index 6c1663627c..8942149a6f 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/FateKey.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/FateKey.java
@@ -28,6 +28,8 @@ import java.util.Optional;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
import org.apache.hadoop.io.DataInputBuffer;
public class FateKey {
@@ -168,4 +170,13 @@ public class FateKey {
throw new IllegalStateException("Unexpected FateInstanceType found " +
type);
}
}
+
+ @Override
+ public String toString() {
+ var buf = new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE);
+ buf.append("FateKeyType", type);
+ keyExtent.ifPresentOrElse(keyExtent -> buf.append("KeyExtent", keyExtent),
+ () -> buf.append("ExternalCompactionID", compactionId.orElseThrow()));
+ return buf.toString();
+ }
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
index f1f82758ff..e1cb4d6405 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
@@ -420,7 +420,7 @@ public class UserFateStore<T> extends AbstractFateStore<T> {
@Override
public Repo<T> top() {
- verifyReserved(false);
+ verifyReservedAndNotDeleted(false);
return scanTx(scanner -> {
scanner.setRange(getRow(fateId));
@@ -436,7 +436,7 @@ public class UserFateStore<T> extends AbstractFateStore<T> {
@Override
public List<ReadOnlyRepo<T>> getStack() {
- verifyReserved(false);
+ verifyReservedAndNotDeleted(false);
return scanTx(scanner -> {
scanner.setRange(getRow(fateId));
@@ -451,7 +451,7 @@ public class UserFateStore<T> extends AbstractFateStore<T> {
@Override
public Serializable getTransactionInfo(TxInfo txInfo) {
- verifyReserved(false);
+ verifyReservedAndNotDeleted(false);
try (Scanner scanner = context.createScanner(tableName,
Authorizations.EMPTY)) {
scanner.setRange(getRow(fateId));
@@ -487,7 +487,7 @@ public class UserFateStore<T> extends AbstractFateStore<T> {
@Override
public long timeCreated() {
- verifyReserved(false);
+ verifyReservedAndNotDeleted(false);
return scanTx(scanner -> {
scanner.setRange(getRow(fateId));
@@ -499,7 +499,7 @@ public class UserFateStore<T> extends AbstractFateStore<T> {
@Override
public void push(Repo<T> repo) throws StackOverflowException {
- verifyReserved(true);
+ verifyReservedAndNotDeleted(true);
Optional<Integer> top = findTop();
@@ -514,7 +514,7 @@ public class UserFateStore<T> extends AbstractFateStore<T> {
@Override
public void pop() {
- verifyReserved(true);
+ verifyReservedAndNotDeleted(true);
Optional<Integer> top = findTop();
top.ifPresent(t -> newMutator(fateId)
@@ -523,7 +523,7 @@ public class UserFateStore<T> extends AbstractFateStore<T> {
@Override
public void setStatus(TStatus status) {
- verifyReserved(true);
+ verifyReservedAndNotDeleted(true);
newMutator(fateId).putStatus(status).mutate();
observedStatus = status;
@@ -531,7 +531,7 @@ public class UserFateStore<T> extends AbstractFateStore<T> {
@Override
public void setTransactionInfo(TxInfo txInfo, Serializable so) {
- verifyReserved(true);
+ verifyReservedAndNotDeleted(true);
final byte[] serialized = serializeTxInfo(so);
@@ -540,7 +540,7 @@ public class UserFateStore<T> extends AbstractFateStore<T> {
@Override
public void delete() {
- verifyReserved(true);
+ verifyReservedAndNotDeleted(true);
var mutator = newMutator(fateId);
mutator.requireStatus(TStatus.NEW, TStatus.SUBMITTED,
TStatus.SUCCESSFUL, TStatus.FAILED);
diff --git
a/core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java
b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java
similarity index 96%
rename from core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java
rename to
core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java
index 08247e7441..d19db17004 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java
+++
b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.accumulo.core.fate;
+package org.apache.accumulo.core.fate.zookeeper;
import static
com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -39,9 +39,15 @@ import java.util.function.Supplier;
import java.util.stream.Stream;
import
org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException;
+import org.apache.accumulo.core.fate.AbstractFateStore;
+import org.apache.accumulo.core.fate.Fate;
import org.apache.accumulo.core.fate.Fate.TxInfo;
-import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
-import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
+import org.apache.accumulo.core.fate.FateId;
+import org.apache.accumulo.core.fate.FateInstanceType;
+import org.apache.accumulo.core.fate.FateKey;
+import org.apache.accumulo.core.fate.ReadOnlyRepo;
+import org.apache.accumulo.core.fate.Repo;
+import org.apache.accumulo.core.fate.StackOverflowException;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy;
import org.apache.hadoop.io.DataInputBuffer;
@@ -239,7 +245,7 @@ public class MetaFateStore<T> extends AbstractFateStore<T> {
@Override
public Repo<T> top() {
- verifyReserved(false);
+ verifyReservedAndNotDeleted(false);
for (int i = 0; i < RETRIES; i++) {
String txpath = getTXPath(fateId);
@@ -291,7 +297,7 @@ public class MetaFateStore<T> extends AbstractFateStore<T> {
@Override
public void push(Repo<T> repo) throws StackOverflowException {
- verifyReserved(true);
+ verifyReservedAndNotDeleted(true);
String txpath = getTXPath(fateId);
try {
@@ -310,7 +316,7 @@ public class MetaFateStore<T> extends AbstractFateStore<T> {
@Override
public void pop() {
- verifyReserved(true);
+ verifyReservedAndNotDeleted(true);
try {
String txpath = getTXPath(fateId);
@@ -326,7 +332,7 @@ public class MetaFateStore<T> extends AbstractFateStore<T> {
@Override
public void setStatus(TStatus status) {
- verifyReserved(true);
+ verifyReservedAndNotDeleted(true);
try {
zk.mutateExisting(getTXPath(fateId), currSerializedData -> {
@@ -353,7 +359,7 @@ public class MetaFateStore<T> extends AbstractFateStore<T> {
@Override
public void delete() {
- verifyReserved(true);
+ verifyReservedAndNotDeleted(true);
try {
zk.recursiveDelete(getTXPath(fateId), NodeMissingPolicy.SKIP);
@@ -365,7 +371,7 @@ public class MetaFateStore<T> extends AbstractFateStore<T> {
@Override
public void setTransactionInfo(Fate.TxInfo txInfo, Serializable so) {
- verifyReserved(true);
+ verifyReservedAndNotDeleted(true);
try {
zk.putPersistentData(getTXPath(fateId) + "/" + txInfo,
serializeTxInfo(so),
@@ -377,14 +383,14 @@ public class MetaFateStore<T> extends
AbstractFateStore<T> {
@Override
public Serializable getTransactionInfo(Fate.TxInfo txInfo) {
- verifyReserved(false);
+ verifyReservedAndNotDeleted(false);
return MetaFateStore.this.getTransactionInfo(txInfo, fateId);
}
@Override
public long timeCreated() {
- verifyReserved(false);
+ verifyReservedAndNotDeleted(false);
try {
Stat stat = zk.getZooKeeper().exists(getTXPath(fateId), false);
@@ -396,7 +402,7 @@ public class MetaFateStore<T> extends AbstractFateStore<T> {
@Override
public List<ReadOnlyRepo<T>> getStack() {
- verifyReserved(false);
+ verifyReservedAndNotDeleted(false);
String txpath = getTXPath(fateId);
outer: while (true) {
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
index f2aa438661..5e26567ac4 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
@@ -71,9 +71,9 @@ import org.apache.accumulo.core.fate.AdminUtil;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.FateInstanceType;
import org.apache.accumulo.core.fate.FateStore;
-import org.apache.accumulo.core.fate.MetaFateStore;
import org.apache.accumulo.core.fate.ReadOnlyFateStore;
import org.apache.accumulo.core.fate.user.UserFateStore;
+import org.apache.accumulo.core.fate.zookeeper.MetaFateStore;
import org.apache.accumulo.core.fate.zookeeper.ZooCache;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.lock.ServiceLock;
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 0af05eb5b7..ae75437225 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -76,8 +76,8 @@ import org.apache.accumulo.core.fate.FateCleaner;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.FateInstanceType;
import org.apache.accumulo.core.fate.FateStore;
-import org.apache.accumulo.core.fate.MetaFateStore;
import org.apache.accumulo.core.fate.user.UserFateStore;
+import org.apache.accumulo.core.fate.zookeeper.MetaFateStore;
import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZcStat;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/meta/MetaFateMetrics.java
b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/meta/MetaFateMetrics.java
index 1087cf1b9b..02aa3a28f4 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/meta/MetaFateMetrics.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/meta/MetaFateMetrics.java
@@ -26,8 +26,8 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.fate.AbstractFateStore;
-import org.apache.accumulo.core.fate.MetaFateStore;
import org.apache.accumulo.core.fate.ReadOnlyFateStore;
+import org.apache.accumulo.core.fate.zookeeper.MetaFateStore;
import org.apache.accumulo.manager.metrics.fate.FateMetrics;
import org.apache.accumulo.server.ServerContext;
import org.apache.zookeeper.KeeperException;
diff --git
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
index 314212693a..e8955e465a 100644
---
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
+++
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
@@ -78,8 +78,8 @@ import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.FateInstanceType;
import org.apache.accumulo.core.fate.FateKey;
import org.apache.accumulo.core.fate.FateStore;
-import org.apache.accumulo.core.fate.MetaFateStore;
import org.apache.accumulo.core.fate.user.UserFateStore;
+import org.apache.accumulo.core.fate.zookeeper.MetaFateStore;
import org.apache.accumulo.core.iterators.DevNull;
import org.apache.accumulo.core.iterators.Filter;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FastFate.java
b/test/src/main/java/org/apache/accumulo/test/fate/FastFate.java
new file mode 100644
index 0000000000..71b198c0ac
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/fate/FastFate.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.fate;
+
+import java.time.Duration;
+import java.util.function.Function;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.fate.Fate;
+import org.apache.accumulo.core.fate.FateStore;
+import org.apache.accumulo.core.fate.Repo;
+
+/**
+ * A FATE which performs the dead reservation cleanup with a much shorter
delay between
+ */
+public class FastFate<T> extends Fate<T> {
+
+ public FastFate(T environment, FateStore<T> store, boolean runDeadResCleaner,
+ Function<Repo<T>,String> toLogStrFunc, AccumuloConfiguration conf) {
+ super(environment, store, runDeadResCleaner, toLogStrFunc, conf);
+ }
+
+ @Override
+ public Duration getDeadResCleanupDelay() {
+ return Duration.ofSeconds(15);
+ }
+}
diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java
b/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java
index bd7c4a2395..d36e98bdec 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java
@@ -24,8 +24,10 @@ import static
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.IN_PROGRES
import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.NEW;
import static
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUBMITTED;
import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.UNKNOWN;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.ArrayList;
@@ -43,6 +45,7 @@ import org.apache.accumulo.core.fate.AbstractFateStore;
import org.apache.accumulo.core.fate.Fate;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.FateStore;
+import org.apache.accumulo.core.fate.ReadOnlyFateStore;
import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.harness.SharedMiniClusterBase;
@@ -479,6 +482,35 @@ public abstract class FateIT extends SharedMiniClusterBase
implements FateTestRu
}
}
+ @Test
+ @Timeout(30)
+ public void testNoWriteAfterDelete() throws Exception {
+ executeTest(this::testNoWriteAfterDelete);
+ }
+
+ protected void testNoWriteAfterDelete(FateStore<TestEnv> store,
ServerContext sctx)
+ throws Exception {
+ final String tableName = getUniqueNames(1)[0];
+ final FateId fateId = store.create();
+ final Repo<TestEnv> repo = new TestRepo("testNoWriteAfterDelete");
+
+ var txStore = store.reserve(fateId);
+
+ // all write ops should be ok after reservation
+ assertDoesNotThrow(() -> txStore.push(repo));
+ assertDoesNotThrow(() ->
txStore.setStatus(ReadOnlyFateStore.TStatus.SUCCESSFUL));
+ assertDoesNotThrow(txStore::pop);
+ assertDoesNotThrow(() -> txStore.setTransactionInfo(Fate.TxInfo.TX_NAME,
"name"));
+ assertDoesNotThrow(txStore::delete);
+
+ // test that all write ops result in an exception since the tx has been
deleted
+ assertThrows(Exception.class, () -> txStore.push(repo));
+ assertThrows(Exception.class, () ->
txStore.setStatus(ReadOnlyFateStore.TStatus.SUCCESSFUL));
+ assertThrows(Exception.class, txStore::pop);
+ assertThrows(Exception.class, () ->
txStore.setTransactionInfo(Fate.TxInfo.TX_NAME, "name"));
+ assertThrows(Exception.class, txStore::delete);
+ }
+
private void submitDeferred(Fate<TestEnv> fate, ServerContext sctx,
Set<FateId> transactions) {
FateId fateId = fate.startTransaction();
transactions.add(fateId);
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java
b/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java
index 8b52f88f97..0db83c044a 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java
@@ -59,9 +59,9 @@ import org.apache.accumulo.core.fate.Fate;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.FateInstanceType;
import org.apache.accumulo.core.fate.FateStore;
-import org.apache.accumulo.core.fate.MetaFateStore;
import org.apache.accumulo.core.fate.ReadOnlyFateStore;
import org.apache.accumulo.core.fate.user.UserFateStore;
+import org.apache.accumulo.core.fate.zookeeper.MetaFateStore;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
import org.apache.accumulo.core.iterators.IteratorUtil;
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java
b/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java
index 57070bacde..f5e537394d 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java
@@ -46,10 +46,10 @@ import org.apache.accumulo.core.fate.Fate;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.FateInstanceType;
import org.apache.accumulo.core.fate.FateStore;
-import org.apache.accumulo.core.fate.MetaFateStore;
import org.apache.accumulo.core.fate.ReadOnlyFateStore;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.fate.user.UserFateStore;
+import org.apache.accumulo.core.fate.zookeeper.MetaFateStore;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
import org.apache.accumulo.harness.SharedMiniClusterBase;
@@ -111,7 +111,7 @@ public class MultipleStoresIT extends SharedMiniClusterBase
{
final int numFateIds = 500;
final FateId fakeFateId = FateId.from(storeType, UUID.randomUUID());
final List<FateStore.FateTxStore<SleepingTestEnv>> reservations = new
ArrayList<>();
- final boolean isUserStore = storeType.equals(FateInstanceType.USER);
+ final boolean isUserStore = storeType == FateInstanceType.USER;
final Set<FateId> allIds = new HashSet<>();
final FateStore<SleepingTestEnv> store1, store2;
final ZooUtil.LockID lock1 = new ZooUtil.LockID("/locks", "L1", 50);
@@ -182,7 +182,7 @@ public class MultipleStoresIT extends SharedMiniClusterBase
{
// Tests that reserve() doesn't hang indefinitely and instead throws an
error
// on reserve() a non-existent transaction.
final FateStore<SleepingTestEnv> store;
- final boolean isUserStore = storeType.equals(FateInstanceType.USER);
+ final boolean isUserStore = storeType == FateInstanceType.USER;
final String tableName = getUniqueNames(1)[0];
final FateId fakeFateId = FateId.from(storeType, UUID.randomUUID());
final ZooUtil.LockID lock = new ZooUtil.LockID("/locks", "L1", 50);
@@ -208,7 +208,7 @@ public class MultipleStoresIT extends SharedMiniClusterBase
{
throws Exception {
final String tableName = getUniqueNames(1)[0];
final int numFateIds = 500;
- final boolean isUserStore = storeType.equals(FateInstanceType.USER);
+ final boolean isUserStore = storeType == FateInstanceType.USER;
final Set<FateId> allIds = new HashSet<>();
final List<FateStore.FateTxStore<SleepingTestEnv>> reservations = new
ArrayList<>();
final FateStore<SleepingTestEnv> store;
@@ -256,7 +256,7 @@ public class MultipleStoresIT extends SharedMiniClusterBase
{
throws Exception {
final String tableName = getUniqueNames(1)[0];
final int numFateIds = 500;
- final boolean isUserStore = storeType.equals(FateInstanceType.USER);
+ final boolean isUserStore = storeType == FateInstanceType.USER;
final Set<FateId> allIds = new HashSet<>();
final List<FateStore.FateTxStore<SleepingTestEnv>> reservations = new
ArrayList<>();
final FateStore<SleepingTestEnv> store;
@@ -312,7 +312,7 @@ public class MultipleStoresIT extends SharedMiniClusterBase
{
private void testMultipleFateInstances(FateInstanceType storeType) throws
Exception {
final String tableName = getUniqueNames(1)[0];
final int numFateIds = 500;
- final boolean isUserStore = storeType.equals(FateInstanceType.USER);
+ final boolean isUserStore = storeType == FateInstanceType.USER;
final Set<FateId> allIds = new HashSet<>();
final FateStore<SleepingTestEnv> store1, store2;
final SleepingTestEnv testEnv1 = new SleepingTestEnv(50);
@@ -380,7 +380,7 @@ public class MultipleStoresIT extends SharedMiniClusterBase
{
// One transaction for each FATE worker thread
final int numFateIds =
Integer.parseInt(Property.MANAGER_FATE_THREADPOOL_SIZE.getDefaultValue());
- final boolean isUserStore = storeType.equals(FateInstanceType.USER);
+ final boolean isUserStore = storeType == FateInstanceType.USER;
final Set<FateId> allIds = new HashSet<>();
final FateStore<LatchTestEnv> store1, store2;
final LatchTestEnv testEnv1 = new LatchTestEnv();
@@ -399,8 +399,8 @@ public class MultipleStoresIT extends SharedMiniClusterBase
{
}
liveLocks.add(lock1);
- Fate<LatchTestEnv> fate1 =
- new Fate<>(testEnv1, store1, true, Object::toString,
DefaultConfiguration.getInstance());
+ FastFate<LatchTestEnv> fate1 = new FastFate<>(testEnv1, store1, true,
Object::toString,
+ DefaultConfiguration.getInstance());
// Ensure nothing is reserved yet
assertTrue(store1.getActiveReservations().isEmpty());
@@ -445,8 +445,8 @@ public class MultipleStoresIT extends SharedMiniClusterBase
{
// Create the new Fate/start the Fate threads (the work finder and the
workers).
// Don't run another dead reservation cleaner since we already have one
running from fate1.
- Fate<LatchTestEnv> fate2 =
- new Fate<>(testEnv2, store2, false, Object::toString,
DefaultConfiguration.getInstance());
+ FastFate<LatchTestEnv> fate2 = new FastFate<>(testEnv2, store2, false,
Object::toString,
+ DefaultConfiguration.getInstance());
// Wait for the "dead" reservations to be deleted and picked up again
(reserved using
// fate2/store2/lock2 now).
@@ -458,7 +458,7 @@ public class MultipleStoresIT extends SharedMiniClusterBase
{
boolean allReservedWithLock2 = store2Reservations.values().stream()
.allMatch(entry ->
FateStore.FateReservation.locksAreEqual(entry.getLockID(), lock2));
return store2Reservations.keySet().equals(allIds) &&
allReservedWithLock2;
- }, 60_000);
+ }, fate1.getDeadResCleanupDelay().toMillis() * 2);
// Finish work and shutdown
testEnv1.workersLatch.countDown();
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateIT.java
b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateIT.java
index a23dde0644..c5f541b5e9 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateIT.java
@@ -32,8 +32,8 @@ import java.util.UUID;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.fate.AbstractFateStore.FateIdGenerator;
import org.apache.accumulo.core.fate.FateId;
-import org.apache.accumulo.core.fate.MetaFateStore;
import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
+import org.apache.accumulo.core.fate.zookeeper.MetaFateStore;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.test.fate.FateIT;
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateInterleavingIT.java
b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateInterleavingIT.java
index bfd267630f..d306e0bfef 100644
---
a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateInterleavingIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateInterleavingIT.java
@@ -24,7 +24,7 @@ import java.util.UUID;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.fate.AbstractFateStore;
-import org.apache.accumulo.core.fate.MetaFateStore;
+import org.apache.accumulo.core.fate.zookeeper.MetaFateStore;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.test.fate.FateInterleavingIT;
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateOpsCommandsIT.java
b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateOpsCommandsIT.java
index 994c7af2eb..c4c1e5b24a 100644
---
a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateOpsCommandsIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateOpsCommandsIT.java
@@ -22,7 +22,7 @@ import static
org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.fate.AbstractFateStore;
-import org.apache.accumulo.core.fate.MetaFateStore;
+import org.apache.accumulo.core.fate.zookeeper.MetaFateStore;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.test.fate.FateOpsCommandsIT;
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStoreFateIT.java
b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStoreFateIT.java
index beb48a5304..af8b98db0f 100644
---
a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStoreFateIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStoreFateIT.java
@@ -36,8 +36,8 @@ import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.fate.AbstractFateStore.FateIdGenerator;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.FateStore;
-import org.apache.accumulo.core.fate.MetaFateStore;
import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
+import org.apache.accumulo.core.fate.zookeeper.MetaFateStore;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.server.ServerContext;
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreIT.java
b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreIT.java
index 55f89cd605..c82662182c 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreIT.java
@@ -24,9 +24,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.EnumSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.UUID;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.BatchWriter;
@@ -40,6 +37,7 @@ import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.fate.AbstractFateStore;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.FateInstanceType;
import org.apache.accumulo.core.fate.FateStore;
@@ -76,29 +74,6 @@ public class UserFateStoreIT extends SharedMiniClusterBase {
SharedMiniClusterBase.stopMiniCluster();
}
- private static class TestUserFateStore extends UserFateStore<TestEnv> {
- private final Iterator<FateId> fateIdIterator;
-
- // use the list of fateIds to simulate collisions on fateIds
- public TestUserFateStore(ClientContext context, String tableName,
List<FateId> fateIds) {
- super(context, tableName, createDummyLockID(), null);
- this.fateIdIterator = fateIds.iterator();
- }
-
- @Override
- public FateId getFateId() {
- if (fateIdIterator.hasNext()) {
- return fateIdIterator.next();
- } else {
- return FateId.from(fateInstanceType, UUID.randomUUID());
- }
- }
-
- public TStatus getStatus(FateId fateId) {
- return _getStatus(fateId);
- }
- }
-
// Test that configs related to the correctness of the FATE instance user
table
// are initialized correctly
@Test
@@ -151,7 +126,7 @@ public class UserFateStoreIT extends SharedMiniClusterBase {
String tableName;
ClientContext client;
FateId fateId;
- TestUserFateStore store;
+ UserFateStore<TestEnv> store;
FateStore.FateTxStore<FateIT.TestEnv> txStore;
@BeforeEach
@@ -159,9 +134,8 @@ public class UserFateStoreIT extends SharedMiniClusterBase {
client = (ClientContext)
Accumulo.newClient().from(getClientProps()).build();
tableName = getUniqueNames(1)[0];
createFateTable(client, tableName);
- fateId = FateId.from(fateInstanceType, UUID.randomUUID());
- store = new TestUserFateStore(client, tableName, List.of(fateId));
- store.create();
+ store = new UserFateStore<>(client, tableName,
AbstractFateStore.createDummyLockID(), null);
+ fateId = store.create();
txStore = store.reserve(fateId);
}
@@ -177,7 +151,10 @@ public class UserFateStoreIT extends SharedMiniClusterBase
{
beforeOperation.run();
injectStatus(client, tableName, fateId, status);
- assertEquals(status, store.getStatus(fateId));
+ var fateIdStatus =
+ store.list().filter(statusEntry ->
statusEntry.getFateId().equals(fateId)).findFirst()
+ .orElseThrow();
+ assertEquals(status, fateIdStatus.getStatus());
if (!acceptableStatuses.contains(status)) {
assertThrows(IllegalStateException.class, operation,
"Expected operation to fail with status " + status + " but it
did not");
@@ -210,8 +187,12 @@ public class UserFateStoreIT extends SharedMiniClusterBase
{
@Test
public void delete() throws Exception {
- testOperationWithStatuses(() -> {}, // No special setup needed for delete
- txStore::delete,
+ testOperationWithStatuses(() -> {
+ // Setup for delete: Create a new txStore before each delete since
delete cannot be called
+ // on the same txStore more than once
+ fateId = store.create();
+ txStore = store.reserve(fateId);
+ }, () -> txStore.delete(),
EnumSet.of(TStatus.NEW, TStatus.SUBMITTED, TStatus.SUCCESSFUL,
TStatus.FAILED));
}
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java
index c5e6e5eea1..5e5775110f 100644
---
a/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java
@@ -51,9 +51,9 @@ import org.apache.accumulo.core.data.InstanceId;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.fate.AdminUtil;
import org.apache.accumulo.core.fate.FateInstanceType;
-import org.apache.accumulo.core.fate.MetaFateStore;
import org.apache.accumulo.core.fate.ReadOnlyFateStore;
import org.apache.accumulo.core.fate.user.UserFateStore;
+import org.apache.accumulo.core.fate.zookeeper.MetaFateStore;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
import org.apache.accumulo.core.manager.state.tables.TableState;
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java
b/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java
index 9172a2d7b4..28b08dbbf0 100644
---
a/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java
+++
b/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java
@@ -62,9 +62,9 @@ import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.fate.AdminUtil;
import org.apache.accumulo.core.fate.AdminUtil.FateStatus;
import org.apache.accumulo.core.fate.FateInstanceType;
-import org.apache.accumulo.core.fate.MetaFateStore;
import org.apache.accumulo.core.fate.ReadOnlyFateStore;
import org.apache.accumulo.core.fate.user.UserFateStore;
+import org.apache.accumulo.core.fate.zookeeper.MetaFateStore;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.metadata.AccumuloTable;
import org.apache.accumulo.core.metadata.StoredTabletFile;