This is an automated email from the ASF dual-hosted git repository. benedict pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new b3f035be1e Accord Fixes: - cfk pruning+prebootstrap=invalid future dependency - exclude retired ranges when filtering RX stillTouches - propagate uses incorrect lowEpoch when fetch finds additional owned/touched ranges - node.withEpoch should callback with TopologyRetiredException, not throw - Recovery can race with durable-applied pruning; must not send durable unless latest ballot on apply - removeRedundantDependencies was not slicing pre-bootstrap range calculation to pa [...] b3f035be1e is described below commit b3f035be1ee2d0b08010a4ac773cec20a83ef606 Author: Benedict Elliott Smith <bened...@apache.org> AuthorDate: Fri Apr 11 11:40:18 2025 +0100 Accord Fixes: - cfk pruning+prebootstrap=invalid future dependency - exclude retired ranges when filtering RX stillTouches - propagate uses incorrect lowEpoch when fetch finds additional owned/touched ranges - node.withEpoch should callback with TopologyRetiredException, not throw - Recovery can race with durable-applied pruning; must not send durable unless latest ballot on apply - removeRedundantDependencies was not slicing pre-bootstrap range calculation to participating ranges - NPE in TopologyManager.atLeast caused by referencing an epoch that has been GC'd - use journal durableBeforePersister in burn test, not NOOP_PERSISTER - ServerUtils.cleanupDirectory use tryDeleteRecursive - FsyncRunnable shutdown - fix NPE in AccordJournalBurnTest patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20688 --- modules/accord | 2 +- src/java/org/apache/cassandra/io/util/File.java | 9 +++++ .../org/apache/cassandra/io/util/PathUtils.java | 46 +++++++++++++++++----- src/java/org/apache/cassandra/journal/Flusher.java | 18 +++++++-- .../service/accord/AccordCommandStore.java | 4 +- .../service/accord/AccordConfigurationService.java | 4 +- .../accord/AccordJournalValueSerializers.java | 2 - .../cassandra/service/accord/AccordKeyspace.java | 1 - .../service/accord/AccordSafeCommandStore.java | 1 - .../cassandra/service/accord/AccordService.java | 1 - .../cassandra/service/accord/AccordTask.java | 5 +-- .../service/accord/CommandsForRanges.java | 2 +- .../accord/interop/AccordInteropAdapter.java | 21 +++++----- .../service/accord/interop/AccordInteropApply.java | 19 ++++----- .../accord/interop/AccordInteropExecution.java | 6 ++- .../accord/interop/AccordInteropPersist.java | 5 ++- .../accord/serializers/ApplySerializers.java | 14 ++++--- .../accord/serializers/CheckStatusSerializers.java | 7 +++- .../accord/serializers/ReadDataSerializers.java | 1 - .../accord/serializers/ResultSerializers.java | 2 +- .../accord/serializers/TxnRequestSerializer.java | 2 +- .../service/accord/txn/AccordUpdateParameters.java | 2 - .../cassandra/service/accord/txn/TxnNamedRead.java | 3 -- .../cassandra/service/accord/txn/TxnUpdate.java | 6 +-- .../distributed/test/HintsMaxSizeTest.java | 2 - .../test/SSTableLoaderEncryptionOptionsTest.java | 2 - .../test/log/FetchLogFromPeers2Test.java | 4 +- .../distributed/test/tcm/SplitBrainTest.java | 2 - .../service/accord/AccordJournalBurnTest.java | 23 ++++++++++- .../unit/org/apache/cassandra/ServerTestUtils.java | 2 +- .../commitlog/CommitLogSegmentManagerCDCTest.java | 2 - .../db/commitlog/CommitlogShutdownTest.java | 2 - .../cassandra/hints/HintServiceBytemanTest.java | 2 - .../cassandra/io/util/SafeMemoryWriterTest.java | 2 - .../serializers/CheckStatusSerializersTest.java | 4 +- .../serializers/CommandsForKeySerializerTest.java | 1 - .../cassandra/tcm/DiscoverySimulationTest.java | 2 - 37 files changed, 143 insertions(+), 90 deletions(-) diff --git a/modules/accord b/modules/accord index 7f95490b13..c5a984cfe4 160000 --- a/modules/accord +++ b/modules/accord @@ -1 +1 @@ -Subproject commit 7f95490b1390b7fc68a4ff4ced7f161bafd8776b +Subproject commit c5a984cfe41bb8d8f1d7a4cb446c194f829a5dd1 diff --git a/src/java/org/apache/cassandra/io/util/File.java b/src/java/org/apache/cassandra/io/util/File.java index de415388ed..e814e37b74 100644 --- a/src/java/org/apache/cassandra/io/util/File.java +++ b/src/java/org/apache/cassandra/io/util/File.java @@ -225,6 +225,15 @@ public class File implements Comparable<File> PathUtils.deleteRecursive(toPathForWrite()); } + /** + * Deletes all files and subdirectories under "dir". + * @return false if the root cannot be deleted + */ + public boolean tryDeleteRecursive() + { + return PathUtils.tryDeleteRecursive(toPathForWrite()); + } + /** * Try to delete the file on process exit. */ diff --git a/src/java/org/apache/cassandra/io/util/PathUtils.java b/src/java/org/apache/cassandra/io/util/PathUtils.java index 8ddd939b4c..fa0a91543e 100644 --- a/src/java/org/apache/cassandra/io/util/PathUtils.java +++ b/src/java/org/apache/cassandra/io/util/PathUtils.java @@ -346,11 +346,22 @@ public final class PathUtils private static void deleteRecursiveUsingNixCommand(Path path, boolean quietly) { String [] cmd = new String[]{ "rm", quietly ? "-rdf" : "-rd", path.toAbsolutePath().toString() }; + IOException failure = null; + if (!quietly && !Files.exists(path)) + failure = new NoSuchFileException(path.toString()); + + if (failure == null) + failure = tryDeleteRecursiveUsingNixCommand(path, quietly); + + if (failure != null) + throw propagateUnchecked(failure, path, true); + } + + private static IOException tryDeleteRecursiveUsingNixCommand(Path path, boolean quietly) + { + String[] cmd = new String[]{ "rm", quietly ? "-rdf" : "-rd", path.toAbsolutePath().toString() }; try { - if (!quietly && !Files.exists(path)) - throw new NoSuchFileException(path.toString()); - Process p = Runtime.getRuntime().exec(cmd); int result = p.waitFor(); @@ -363,24 +374,39 @@ public final class PathUtils } if (result != 0 && Files.exists(path)) - { - logger.error("{} returned:\nstdout:\n{}\n\nstderr:\n{}", Arrays.toString(cmd), out, err); - throw new IOException(String.format("%s returned non-zero exit code: %d%nstdout:%n%s%n%nstderr:%n%s", Arrays.toString(cmd), result, out, err)); - } + return new IOException(String.format("%s returned non-zero exit code: %d%nstdout:%n%s%n%nstderr:%n%s", Arrays.toString(cmd), result, out, err)); onDeletion.accept(path); + return null; } catch (IOException e) { - throw propagateUnchecked(e, path, true); + return e; } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new FSWriteError(e, path); + return new IOException("Interrupted while executing command " + Arrays.toString(cmd), e); } } + + /** + * Deletes all files and subdirectories under "path". + * @param path file to be deleted + * @return false if the root cannot be deleted + */ + public static boolean tryDeleteRecursive(Path path) + { + if (USE_NIX_RECURSIVE_DELETE.getBoolean() && path.getFileSystem() == java.nio.file.FileSystems.getDefault()) + return null == tryDeleteRecursiveUsingNixCommand(path, true); + + if (isDirectory(path)) + forEach(path, PathUtils::tryDeleteRecursive); + + // The directory should now be empty, so now it can be smoked + return tryDelete(path); + } + /** * Deletes all files and subdirectories under "path". * @param path file to be deleted diff --git a/src/java/org/apache/cassandra/journal/Flusher.java b/src/java/org/apache/cassandra/journal/Flusher.java index d48d80171b..02f85df0cb 100644 --- a/src/java/org/apache/cassandra/journal/Flusher.java +++ b/src/java/org/apache/cassandra/journal/Flusher.java @@ -143,10 +143,20 @@ final class Flusher<K, V> } } + private boolean hasWork() + { + return hasWork(fsyncStartedFor); + } + + private boolean hasWork(long lastStartedAt) + { + return fsyncWaitingSince != lastStartedAt; + } + private void awaitWork() throws InterruptedException { long lastStartedAt = fsyncStartedFor; - if (fsyncWaitingSince != lastStartedAt) + if (hasWork(lastStartedAt)) return; awaitingWork = Thread.currentThread(); @@ -158,7 +168,7 @@ final class Flusher<K, V> throw new InterruptedException(); } - if (fsyncWaitingSince != lastStartedAt) + if (hasWork(lastStartedAt)) break; LockSupport.park(); @@ -175,7 +185,9 @@ final class Flusher<K, V> public void doRun(Interruptible.State state) throws InterruptedException { - awaitWork(); + if (state == NORMAL) awaitWork(); + else if (!hasWork()) return; + if (fsyncing == null) fsyncing = journal.oldestActiveSegment(); diff --git a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java index 4e8629696a..ab74a128a7 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java +++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java @@ -501,7 +501,9 @@ public class AccordCommandStore extends CommandStore RedundantBefore redundantBefore; if (safeRedundantBefore == null) redundantBefore = RedundantBefore.EMPTY; else redundantBefore = safeRedundantBefore.redundantBefore; - return new AccordCompactionInfo(id, redundantBefore, rangesForEpoch, tableId); + CommandStores.RangesForEpoch ranges = this.rangesForEpoch; + if (ranges == null) ranges = CommandStores.RangesForEpoch.EMPTY; + return new AccordCompactionInfo(id, redundantBefore, ranges, tableId); } public RangeSearcher rangeSearcher() diff --git a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java index e0ca26ee5d..fade204b1f 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java +++ b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java @@ -38,6 +38,7 @@ import accord.primitives.Ranges; import accord.topology.Shard; import accord.topology.Topology; import accord.utils.Invariants; +import accord.utils.SortedListSet; import accord.utils.async.AsyncResult; import accord.utils.async.AsyncResults; import org.agrona.collections.LongArrayList; @@ -449,8 +450,7 @@ public class AccordConfigurationService extends AbstractConfigurationService<Acc epochState.setSyncStatus(SyncStatus.NOTIFYING); } - // TODO (required): replace with SortedArraySet when it is available - Set<Node.Id> notify = new HashSet<>(topology.nodes()); + Set<Node.Id> notify = SortedListSet.allOf(topology.nodes()); notify.remove(localId); syncPropagator.reportSyncComplete(epoch, notify, localId); } diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournalValueSerializers.java b/src/java/org/apache/cassandra/service/accord/AccordJournalValueSerializers.java index e72beaab2c..58b238d31f 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordJournalValueSerializers.java +++ b/src/java/org/apache/cassandra/service/accord/AccordJournalValueSerializers.java @@ -240,8 +240,6 @@ public class AccordJournalValueSerializers @Override public void deserialize(JournalKey journalKey, DurableBeforeAccumulator into, DataInputPlus in, Version userVersion) throws IOException { - // TODO: maybe using local serializer is not the best call here, but how do we distinguish - // between messaging and disk versioning? into.update(CommandStoreSerializers.durableBefore.deserialize(in)); } } diff --git a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java index e87250fa52..e597d7d415 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java +++ b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java @@ -129,7 +129,6 @@ public class AccordKeyspace public static final Set<String> TABLE_NAMES = ImmutableSet.of(COMMANDS_FOR_KEY, JOURNAL); - // TODO (desired): implement a custom type so we can get correct sort order public static final TupleType TIMESTAMP_TYPE = new TupleType(Lists.newArrayList(LongType.instance, LongType.instance, Int32Type.instance)); private static final ClusteringIndexFilter FULL_PARTITION = new ClusteringIndexNamesFilter(BTreeSet.of(new ClusteringComparator(), Clustering.EMPTY), false); diff --git a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java index 5ace8976c8..bc7c7cf841 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java +++ b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java @@ -225,7 +225,6 @@ public class AccordSafeCommandStore extends AbstractSafeCommandStore<AccordSafeC commandsForRanges.visit(keysOrRanges, startedBefore, testKind, visitor, p1, p2); } - // TODO (expected): instead of accepting a slice, accept the min/max epoch and let implementation handle it @Override public boolean visit(Unseekables<?> keysOrRanges, TxnId testTxnId, Txn.Kind.Kinds testKind, TestStartedAt testStartedAt, Timestamp testStartedAtTimestamp, ComputeIsDep computeIsDep, AllCommandVisitor visit) { diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java b/src/java/org/apache/cassandra/service/accord/AccordService.java index 48c92e6296..f85aee62af 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordService.java +++ b/src/java/org/apache/cassandra/service/accord/AccordService.java @@ -467,7 +467,6 @@ public class AccordService implements IAccordService, Shutdownable if (response.current >= from) return response; - metadata = ClusterMetadata.current(); } catch (Throwable e) { diff --git a/src/java/org/apache/cassandra/service/accord/AccordTask.java b/src/java/org/apache/cassandra/service/accord/AccordTask.java index 61d86db9bf..9930d2093e 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordTask.java +++ b/src/java/org/apache/cassandra/service/accord/AccordTask.java @@ -194,11 +194,11 @@ public abstract class AccordTask<R> extends Task implements Runnable, Function<S private final String loggingId; private static final AtomicLong nextLoggingId = new AtomicLong(Clock.Global.currentTimeMillis()); - // TODO (expected): merge all of these maps into one + // TODO (desired): merge all of these maps into one @Nullable Object2ObjectHashMap<TxnId, AccordSafeCommand> commands; @Nullable Object2ObjectHashMap<RoutingKey, AccordSafeCommandsForKey> commandsForKey; @Nullable Object2ObjectHashMap<Object, AccordSafeState<?, ?>> loading; - // TODO (expected): collection supporting faster deletes but still fast poll (e.g. some ordered collection) + // TODO (desired): collection supporting faster deletes but still fast poll (e.g. some ordered collection) @Nullable ArrayDeque<AccordCacheEntry<?, ?>> waitingToLoad; @Nullable RangeTxnScanner rangeScanner; boolean hasRanges; @@ -662,7 +662,6 @@ public abstract class AccordTask<R> extends Task implements Runnable, Function<S safeStore = commandStore.begin(this, commandsForRanges); R result = apply(safeStore); - // TODO (required): currently, we are not very efficient about ensuring that we persist the absolute minimum amount of state. Improve that. List<Journal.CommandUpdate> changes = null; if (commands != null) { diff --git a/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java b/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java index ffd1754d2c..470822906f 100644 --- a/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java +++ b/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java @@ -46,7 +46,7 @@ import org.apache.cassandra.service.accord.api.TokenKey; import static accord.local.CommandSummaries.SummaryStatus.NOT_DIRECTLY_WITNESSED; -// TODO (required): move to accord-core, merge with existing logic there +// TODO (expected): move to accord-core, merge with existing logic there public class CommandsForRanges extends TreeMap<Timestamp, Summary> implements CommandSummaries.ByTxnIdSnapshot { public CommandsForRanges(Map<? extends Timestamp, ? extends Summary> m) diff --git a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropAdapter.java b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropAdapter.java index 91acc361d1..ca466105eb 100644 --- a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropAdapter.java +++ b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropAdapter.java @@ -31,6 +31,7 @@ import accord.coordinate.ExecuteFlag.ExecuteFlags; import accord.coordinate.ExecutePath; import accord.local.Node; import accord.messages.Apply; +import accord.primitives.Ballot; import accord.primitives.Deps; import accord.primitives.FullRoute; import accord.primitives.Route; @@ -86,23 +87,23 @@ public class AccordInteropAdapter extends TxnAdapter } @Override - public void execute(Node node, Topologies any, FullRoute<?> route, ExecutePath path, ExecuteFlags executeFlags, TxnId txnId, Txn txn, Timestamp executeAt, Deps stableDeps, Deps sendDeps, BiConsumer<? super Result, Throwable> callback) + public void execute(Node node, Topologies any, FullRoute<?> route, Ballot ballot, ExecutePath path, ExecuteFlags executeFlags, TxnId txnId, Txn txn, Timestamp executeAt, Deps stableDeps, Deps sendDeps, BiConsumer<? super Result, Throwable> callback) { - if (!doInteropExecute(node, route, txnId, txn, executeAt, stableDeps, callback)) - super.execute(node, any, route, path, executeFlags, txnId, txn, executeAt, stableDeps, sendDeps, callback); + if (!doInteropExecute(node, route, ballot, txnId, txn, executeAt, stableDeps, callback)) + super.execute(node, any, route, ballot, path, executeFlags, txnId, txn, executeAt, stableDeps, sendDeps, callback); } @Override - public void persist(Node node, Topologies any, Route<?> require, Route<?> sendTo, SelectNodeOwnership selectSendTo, FullRoute<?> route, TxnId txnId, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result, BiConsumer<? super Result, Throwable> callback) + public void persist(Node node, Topologies any, Route<?> require, Route<?> sendTo, SelectNodeOwnership selectSendTo, FullRoute<?> route, Ballot ballot, TxnId txnId, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result, BiConsumer<? super Result, Throwable> callback) { - if (applyKind == Minimal && doInteropPersist(node, any, require, sendTo, selectSendTo, txnId, txn, executeAt, deps, writes, result, route, callback)) + if (applyKind == Minimal && doInteropPersist(node, any, require, sendTo, selectSendTo, ballot, txnId, txn, executeAt, deps, writes, result, route, callback)) return; - super.persist(node, any, require, sendTo, selectSendTo, route, txnId, txn, executeAt, deps, writes, result, callback); + super.persist(node, any, require, sendTo, selectSendTo, route, ballot, txnId, txn, executeAt, deps, writes, result, callback); } - private boolean doInteropExecute(Node node, FullRoute<?> route, TxnId txnId, Txn txn, Timestamp executeAt, Deps deps, BiConsumer<? super Result, Throwable> callback) + private boolean doInteropExecute(Node node, FullRoute<?> route, Ballot ballot, TxnId txnId, Txn txn, Timestamp executeAt, Deps deps, BiConsumer<? super Result, Throwable> callback) { // Unrecoverable repair always needs to be run by AccordInteropExecution AccordUpdate.Kind updateKind = AccordUpdate.kind(txn.update()); @@ -110,12 +111,12 @@ public class AccordInteropAdapter extends TxnAdapter if (updateKind != AccordUpdate.Kind.UNRECOVERABLE_REPAIR && (consistencyLevel == null || consistencyLevel == ConsistencyLevel.ONE || txn.read().keys().isEmpty())) return false; - new AccordInteropExecution(node, txnId, txn, updateKind, route, executeAt, deps, callback, executor, consistencyLevel, endpointMapper) + new AccordInteropExecution(node, txnId, txn, updateKind, route, ballot, executeAt, deps, callback, executor, consistencyLevel, endpointMapper) .start(); return true; } - private boolean doInteropPersist(Node node, Topologies any, Route<?> require, Route<?> sendTo, SelectNodeOwnership selectSendTo, TxnId txnId, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result, FullRoute<?> fullRoute, BiConsumer<? super Result, Throwable> callback) + private boolean doInteropPersist(Node node, Topologies any, Route<?> require, Route<?> sendTo, SelectNodeOwnership selectSendTo, Ballot ballot, TxnId txnId, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result, FullRoute<?> fullRoute, BiConsumer<? super Result, Throwable> callback) { Update update = txn.update(); ConsistencyLevel consistencyLevel = update instanceof AccordUpdate ? ((AccordUpdate) update).cassandraCommitCL() : null; @@ -123,7 +124,7 @@ public class AccordInteropAdapter extends TxnAdapter return false; Topologies all = execution(node, any, sendTo, selectSendTo, fullRoute, txnId, executeAt); - new AccordInteropPersist(node, all, txnId, require, txn, executeAt, deps, writes, result, fullRoute, consistencyLevel, callback) + new AccordInteropPersist(node, all, txnId, require, ballot, txn, executeAt, deps, writes, result, fullRoute, consistencyLevel, callback) .start(Minimal, any, writes, result); return true; } diff --git a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropApply.java b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropApply.java index b9af95db34..3a5671c970 100644 --- a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropApply.java +++ b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropApply.java @@ -29,6 +29,7 @@ import accord.local.SafeCommandStore; import accord.local.StoreParticipants; import accord.messages.Apply; import accord.messages.MessageType; +import accord.primitives.Ballot; import accord.primitives.Deps; import accord.primitives.FullRoute; import accord.primitives.PartialDeps; @@ -65,23 +66,23 @@ public class AccordInteropApply extends Apply implements LocalListeners.ComplexL public static final Apply.Factory FACTORY = new Apply.Factory() { @Override - public Apply create(Kind kind, Id to, Topologies participates, TxnId txnId, Route<?> route, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result, FullRoute<?> fullRoute) + public Apply create(Kind kind, Id to, Topologies participates, TxnId txnId, Ballot ballot, Route<?> route, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result, FullRoute<?> fullRoute) { checkArgument(kind != Kind.Maximal, "Shouldn't need to send a maximal commit with interop support"); ConsistencyLevel commitCL = txn.update() instanceof AccordUpdate ? ((AccordUpdate) txn.update()).cassandraCommitCL() : null; // Any asynchronous apply option should use the regular Apply that doesn't wait for writes to complete if (commitCL == null || commitCL == ConsistencyLevel.ANY) - return Apply.FACTORY.create(kind, to, participates, txnId, route, txn, executeAt, deps, writes, result, fullRoute); - return new AccordInteropApply(kind, to, participates, txnId, route, txn, executeAt, deps, writes, result, fullRoute); + return Apply.FACTORY.create(kind, to, participates, txnId, ballot, route, txn, executeAt, deps, writes, result, fullRoute); + return new AccordInteropApply(kind, to, participates, txnId, ballot, route, txn, executeAt, deps, writes, result, fullRoute); } }; public static final IVersionedSerializer<AccordInteropApply> serializer = new ApplySerializer<AccordInteropApply>() { @Override - protected AccordInteropApply deserializeApply(TxnId txnId, Route<?> scope, long minEpoch, long waitForEpoch, long maxEpoch, Apply.Kind kind, Timestamp executeAt, PartialDeps deps, PartialTxn txn, @Nullable FullRoute<?> fullRoute, Writes writes, Result result) + protected AccordInteropApply deserializeApply(TxnId txnId, Ballot ballot, Route<?> scope, long minEpoch, long waitForEpoch, long maxEpoch, Apply.Kind kind, Timestamp executeAt, PartialDeps deps, PartialTxn txn, @Nullable FullRoute<?> fullRoute, Writes writes, Result result) { - return new AccordInteropApply(kind, txnId, scope, minEpoch, waitForEpoch, maxEpoch, executeAt, deps, txn, fullRoute, writes, result); + return new AccordInteropApply(kind, txnId, ballot, scope, minEpoch, waitForEpoch, maxEpoch, executeAt, deps, txn, fullRoute, writes, result); } }; @@ -89,14 +90,14 @@ public class AccordInteropApply extends Apply implements LocalListeners.ComplexL transient Int2ObjectHashMap<LocalListeners.Registered> listeners; boolean failed; - private AccordInteropApply(Kind kind, TxnId txnId, Route<?> route, long minEpoch, long waitForEpoch, long maxEpoch, Timestamp executeAt, PartialDeps deps, @Nullable PartialTxn txn, @Nullable FullRoute<?> fullRoute, Writes writes, Result result) + private AccordInteropApply(Kind kind, TxnId txnId, Ballot ballot, Route<?> route, long minEpoch, long waitForEpoch, long maxEpoch, Timestamp executeAt, PartialDeps deps, @Nullable PartialTxn txn, @Nullable FullRoute<?> fullRoute, Writes writes, Result result) { - super(kind, txnId, route, minEpoch, waitForEpoch, maxEpoch, executeAt, deps, txn, fullRoute, writes, result); + super(kind, txnId, ballot, route, minEpoch, waitForEpoch, maxEpoch, executeAt, deps, txn, fullRoute, writes, result); } - private AccordInteropApply(Kind kind, Id to, Topologies participates, TxnId txnId, Route<?> route, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result, FullRoute<?> fullRoute) + private AccordInteropApply(Kind kind, Id to, Topologies participates, TxnId txnId, Ballot ballot, Route<?> route, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result, FullRoute<?> fullRoute) { - super(kind, to, participates, txnId, route, txn, executeAt, deps, writes, result, fullRoute); + super(kind, to, participates, txnId, ballot, route, txn, executeAt, deps, writes, result, fullRoute); } @Override diff --git a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropExecution.java b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropExecution.java index 2d82cf1749..ffd42f0c48 100644 --- a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropExecution.java +++ b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropExecution.java @@ -144,6 +144,7 @@ public class AccordInteropExecution implements ReadCoordinator, MaximalCommitSen private final TxnId txnId; private final Txn txn; private final FullRoute<?> route; + private final Ballot ballot; private final Timestamp executeAt; private final Deps deps; private final BiConsumer<? super Result, Throwable> callback; @@ -161,7 +162,7 @@ public class AccordInteropExecution implements ReadCoordinator, MaximalCommitSen private final Set<InetAddressAndPort> contacted; private final AccordUpdate.Kind updateKind; - public AccordInteropExecution(Node node, TxnId txnId, Txn txn, AccordUpdate.Kind updateKind, FullRoute<?> route, Timestamp executeAt, Deps deps, BiConsumer<? super Result, Throwable> callback, + public AccordInteropExecution(Node node, TxnId txnId, Txn txn, AccordUpdate.Kind updateKind, FullRoute<?> route, Ballot ballot, Timestamp executeAt, Deps deps, BiConsumer<? super Result, Throwable> callback, AgentExecutor executor, ConsistencyLevel consistencyLevel, AccordEndpointMapper endpointMapper) { requireArgument(!txn.read().keys().isEmpty() || updateKind == AccordUpdate.Kind.UNRECOVERABLE_REPAIR); @@ -169,6 +170,7 @@ public class AccordInteropExecution implements ReadCoordinator, MaximalCommitSen this.txnId = txnId; this.txn = txn; this.route = route; + this.ballot = ballot; this.executeAt = executeAt; this.deps = deps; this.callback = callback; @@ -402,7 +404,7 @@ public class AccordInteropExecution implements ReadCoordinator, MaximalCommitSen CommandStore cs = node.commandStores().select(route.homeKey()); result.beginAsResult().withExecutor(cs).begin((data, failure) -> { if (failure == null) - ((CoordinationAdapter)node.coordinationAdapter(txnId, Standard)).persist(node, executes, route, txnId, txn, executeAt, deps, txnId.is(Write) ? txn.execute(txnId, executeAt, data) : null, txn.result(txnId, executeAt, data), callback); + ((CoordinationAdapter)node.coordinationAdapter(txnId, Standard)).persist(node, executes, route, ballot, txnId, txn, executeAt, deps, txnId.is(Write) ? txn.execute(txnId, executeAt, data) : null, txn.result(txnId, executeAt, data), callback); else callback.accept(null, failure); }); diff --git a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropPersist.java b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropPersist.java index e61d3934b5..967438ea56 100644 --- a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropPersist.java +++ b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropPersist.java @@ -28,6 +28,7 @@ import accord.coordinate.tracking.RequestStatus; import accord.coordinate.tracking.ResponseTracker; import accord.local.Node; import accord.messages.Apply; +import accord.primitives.Ballot; import accord.primitives.Deps; import accord.primitives.FullRoute; import accord.primitives.Route; @@ -108,9 +109,9 @@ public class AccordInteropPersist extends Persist private final ConsistencyLevel consistencyLevel; private CallbackHolder callback; - public AccordInteropPersist(Node node, Topologies topologies, TxnId txnId, Route<?> sendTo, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result, FullRoute<?> fullRoute, ConsistencyLevel consistencyLevel, BiConsumer<? super Result, Throwable> clientCallback) + public AccordInteropPersist(Node node, Topologies topologies, TxnId txnId, Route<?> sendTo, Ballot ballot, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result, FullRoute<?> fullRoute, ConsistencyLevel consistencyLevel, BiConsumer<? super Result, Throwable> clientCallback) { - super(node, topologies, txnId, sendTo, txn, executeAt, deps, writes, result, fullRoute, AccordInteropApply.FACTORY); + super(node, topologies, txnId, ballot, sendTo, txn, executeAt, deps, writes, result, fullRoute, AccordInteropApply.FACTORY); Invariants.requireArgument(consistencyLevel == ConsistencyLevel.QUORUM || consistencyLevel == ConsistencyLevel.ALL || consistencyLevel == ConsistencyLevel.SERIAL || consistencyLevel == ConsistencyLevel.ONE); this.consistencyLevel = consistencyLevel; registerClientCallback(result, clientCallback); diff --git a/src/java/org/apache/cassandra/service/accord/serializers/ApplySerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/ApplySerializers.java index cf3f449354..181634cc50 100644 --- a/src/java/org/apache/cassandra/service/accord/serializers/ApplySerializers.java +++ b/src/java/org/apache/cassandra/service/accord/serializers/ApplySerializers.java @@ -22,6 +22,7 @@ import java.io.IOException; import accord.api.Result; import accord.messages.Apply; +import accord.primitives.Ballot; import accord.primitives.FullRoute; import accord.primitives.PartialDeps; import accord.primitives.PartialTxn; @@ -65,6 +66,7 @@ public class ApplySerializers @Override public void serializeBody(A apply, DataOutputPlus out, Version version) throws IOException { + CommandSerializers.ballot.serialize(apply.ballot, out); out.writeVInt(apply.minEpoch - apply.waitForEpoch); out.writeUnsignedVInt(apply.maxEpoch - apply.minEpoch); kind.serialize(apply.kind, out); @@ -76,15 +78,16 @@ public class ApplySerializers CommandSerializers.writes.serialize(apply.writes, out, version); } - protected abstract A deserializeApply(TxnId txnId, Route<?> scope, long minEpoch, long waitForEpoch, long maxEpoch, Apply.Kind kind, + protected abstract A deserializeApply(TxnId txnId, Ballot ballot, Route<?> scope, long minEpoch, long waitForEpoch, long maxEpoch, Apply.Kind kind, Timestamp executeAt, PartialDeps deps, PartialTxn txn, FullRoute<?> fullRoute, Writes writes, Result result); @Override public A deserializeBody(DataInputPlus in, Version version, TxnId txnId, Route<?> scope, long waitForEpoch) throws IOException { + Ballot ballot = CommandSerializers.ballot.deserialize(in); long minEpoch = waitForEpoch + in.readVInt(); long maxEpoch = minEpoch + in.readUnsignedVInt(); - return deserializeApply(txnId, scope, minEpoch, waitForEpoch, maxEpoch, + return deserializeApply(txnId, ballot, scope, minEpoch, waitForEpoch, maxEpoch, kind.deserialize(in), ExecuteAtSerializer.deserialize(txnId, in), DepsSerializers.partialDeps.deserialize(in), @@ -97,7 +100,8 @@ public class ApplySerializers @Override public long serializedBodySize(A apply, Version version) { - return TypeSizes.sizeofVInt(apply.minEpoch - apply.waitForEpoch) + return CommandSerializers.ballot.serializedSize(apply.ballot) + + TypeSizes.sizeofVInt(apply.minEpoch - apply.waitForEpoch) + TypeSizes.sizeofUnsignedVInt(apply.maxEpoch - apply.minEpoch) + kind.serializedSize(apply.kind) + ExecuteAtSerializer.serializedSize(apply.txnId, apply.executeAt) @@ -111,10 +115,10 @@ public class ApplySerializers public static final IVersionedSerializer<Apply> request = new ApplySerializer<>() { @Override - protected Apply deserializeApply(TxnId txnId, Route<?> scope, long minEpoch, long waitForEpoch, long maxEpoch, Apply.Kind kind, + protected Apply deserializeApply(TxnId txnId, Ballot ballot, Route<?> scope, long minEpoch, long waitForEpoch, long maxEpoch, Apply.Kind kind, Timestamp executeAt, PartialDeps deps, PartialTxn txn, FullRoute<?> fullRoute, Writes writes, Result result) { - return Apply.SerializationSupport.create(txnId, scope, minEpoch, waitForEpoch, maxEpoch, kind, executeAt, deps, txn, fullRoute, writes, result); + return Apply.SerializationSupport.create(txnId, ballot, scope, minEpoch, waitForEpoch, maxEpoch, kind, executeAt, deps, txn, fullRoute, writes, result); } }; diff --git a/src/java/org/apache/cassandra/service/accord/serializers/CheckStatusSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/CheckStatusSerializers.java index 9933f53f9b..6981761612 100644 --- a/src/java/org/apache/cassandra/service/accord/serializers/CheckStatusSerializers.java +++ b/src/java/org/apache/cassandra/service/accord/serializers/CheckStatusSerializers.java @@ -130,6 +130,7 @@ public class CheckStatusSerializers KeySerializers.participants.serialize(check.query, out); out.writeUnsignedVInt(check.sourceEpoch); out.writeByte(check.includeInfo.ordinal()); + CommandSerializers.ballot.serialize(check.bumpBallot, out); } @Override @@ -139,7 +140,8 @@ public class CheckStatusSerializers Participants<?> query = KeySerializers.participants.deserialize(in); long sourceEpoch = in.readUnsignedVInt(); CheckStatus.IncludeInfo info = infos[in.readByte()]; - return new CheckStatus(txnId, query, sourceEpoch, info); + Ballot ballot = CommandSerializers.ballot.deserialize(in); + return new CheckStatus(txnId, query, sourceEpoch, info, ballot); } @Override @@ -148,7 +150,8 @@ public class CheckStatusSerializers return CommandSerializers.txnId.serializedSize(check.txnId) + KeySerializers.participants.serializedSize(check.query) + TypeSizes.sizeofUnsignedVInt(check.sourceEpoch) - + TypeSizes.BYTE_SIZE; + + TypeSizes.BYTE_SIZE + + CommandSerializers.ballot.serializedSize(check.bumpBallot); } }; diff --git a/src/java/org/apache/cassandra/service/accord/serializers/ReadDataSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/ReadDataSerializers.java index 3aedf090ea..82e702b5c2 100644 --- a/src/java/org/apache/cassandra/service/accord/serializers/ReadDataSerializers.java +++ b/src/java/org/apache/cassandra/service/accord/serializers/ReadDataSerializers.java @@ -223,7 +223,6 @@ public class ReadDataSerializers public static final class ReplySerializer<D extends Data> implements IVersionedSerializer<ReadReply> { - // TODO (expected): use something other than ordinal final CommitOrReadNack[] nacks = CommitOrReadNack.values(); private final VersionedSerializer<D, Version> dataSerializer; diff --git a/src/java/org/apache/cassandra/service/accord/serializers/ResultSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/ResultSerializers.java index 0e3413905f..5d2d5efa46 100644 --- a/src/java/org/apache/cassandra/service/accord/serializers/ResultSerializers.java +++ b/src/java/org/apache/cassandra/service/accord/serializers/ResultSerializers.java @@ -26,7 +26,7 @@ import org.apache.cassandra.io.util.DataOutputPlus; public class ResultSerializers { - // TODO (expected): this is meant to encode e.g. whether the transaction's condition met or not for clients to later query + // TODO (desired): this is meant to encode e.g. whether the transaction's condition met or not for clients to later query public static final Result APPLIED = new Result() { @Override diff --git a/src/java/org/apache/cassandra/service/accord/serializers/TxnRequestSerializer.java b/src/java/org/apache/cassandra/service/accord/serializers/TxnRequestSerializer.java index 800a491b6d..fe2cbe2613 100644 --- a/src/java/org/apache/cassandra/service/accord/serializers/TxnRequestSerializer.java +++ b/src/java/org/apache/cassandra/service/accord/serializers/TxnRequestSerializer.java @@ -52,7 +52,7 @@ public abstract class TxnRequestSerializer<T extends TxnRequest<?>> implements I { TxnId txnId = CommandSerializers.txnId.deserialize(in); Route<?> scope = KeySerializers.route.deserialize(in); - // TODO: there should be a base epoch + // TODO (desired): there should be a base epoch long waitForEpoch = in.readUnsignedVInt(); return deserializeBody(in, version, txnId, scope, waitForEpoch); } diff --git a/src/java/org/apache/cassandra/service/accord/txn/AccordUpdateParameters.java b/src/java/org/apache/cassandra/service/accord/txn/AccordUpdateParameters.java index 1bf46889dd..efb222b655 100644 --- a/src/java/org/apache/cassandra/service/accord/txn/AccordUpdateParameters.java +++ b/src/java/org/apache/cassandra/service/accord/txn/AccordUpdateParameters.java @@ -78,7 +78,6 @@ public class AccordUpdateParameters // For the time being, guardrails are disabled for Accord queries. ClientState disabledGuardrails = null; - // TODO : How should Accord work with TTL? int ttl = metadata.params.defaultTimeToLive; return new RowUpdateParameters(metadata, disabledGuardrails, @@ -103,7 +102,6 @@ public class AccordUpdateParameters checkState(data.entrySet().size() == 1, "CAS read should only have one entry"); return ImmutableMap.of(dk, value); case AUTO_READ: - // TODO (review): Is this the right DK being passed into that matches what we used to store in TxnDataName if (TxnData.txnDataNameIndex(name) == index) return ImmutableMap.of(dk, value); default: diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java b/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java index 2f84c849e6..ea27a398c2 100644 --- a/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java +++ b/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java @@ -235,9 +235,6 @@ public class TxnNamedRead extends AbstractSerialized<ReadCommand, TableMetadatas if (command == null) return AsyncResults.success(TxnData.NOOP_DATA); - // TODO (required, safety): before release, double check reasoning that this is safe -// AccordCommandsForKey cfk = ((SafeAccordCommandStore)safeStore).commandsForKey(key); -// int nowInSeconds = cfk.nowInSecondsFor(executeAt, isForWriteTxn); // It's fine for our nowInSeconds to lag slightly our insertion timestamp, as to the user // this simply looks like the transaction witnessed TTL'd data and the data then expired // immediately after the transaction executed, and this simplifies things a great deal diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java b/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java index 8f3baf374e..1fdc0e54f9 100644 --- a/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java +++ b/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java @@ -174,7 +174,7 @@ public class TxnUpdate extends AccordUpdate public Update slice(Ranges ranges) { Keys keys = this.keys.slice(ranges); - // TODO: Slice the condition. + // TODO (desired): Slice the condition. return new TxnUpdate(tables, keys, select(this.keys, keys, fragments), condition, cassandraCommitCL, preserveTimestamps); } @@ -182,7 +182,7 @@ public class TxnUpdate extends AccordUpdate public Update intersecting(Participants<?> participants) { Keys keys = this.keys.intersecting(participants); - // TODO: Slice the condition. + // TODO (desired): Slice the condition. return new TxnUpdate(tables, keys, select(this.keys, keys, fragments), condition, cassandraCommitCL, preserveTimestamps); } @@ -201,9 +201,9 @@ public class TxnUpdate extends AccordUpdate @Override public Update merge(Update update) { - // TODO: special method for linear merging keyed and non-keyed lists simultaneously TxnUpdate that = (TxnUpdate) update; Keys mergedKeys = this.keys.with(that.keys); + // TODO (desired): special method for linear merging keyed and non-keyed lists simultaneously ByteBuffer[] mergedFragments = merge(this.keys, that.keys, this.fragments, that.fragments, mergedKeys.size()); return new TxnUpdate(tables, mergedKeys, mergedFragments, condition, cassandraCommitCL, preserveTimestamps); } diff --git a/test/distributed/org/apache/cassandra/distributed/test/HintsMaxSizeTest.java b/test/distributed/org/apache/cassandra/distributed/test/HintsMaxSizeTest.java index b1cf19e703..7e25f4311d 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/HintsMaxSizeTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/HintsMaxSizeTest.java @@ -19,7 +19,6 @@ package org.apache.cassandra.distributed.test; import java.util.UUID; -import org.junit.Ignore; import org.junit.Test; import org.apache.cassandra.auth.CassandraRoleManager; @@ -44,7 +43,6 @@ import static org.awaitility.Awaitility.await; @SuppressWarnings("Convert2MethodRef") public class HintsMaxSizeTest extends TestBaseImpl { - @Ignore @Test public void testMaxHintedHandoffSize() throws Exception { diff --git a/test/distributed/org/apache/cassandra/distributed/test/SSTableLoaderEncryptionOptionsTest.java b/test/distributed/org/apache/cassandra/distributed/test/SSTableLoaderEncryptionOptionsTest.java index b3de4edbd6..ea3c980586 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/SSTableLoaderEncryptionOptionsTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/SSTableLoaderEncryptionOptionsTest.java @@ -26,7 +26,6 @@ import com.google.common.collect.ImmutableMap; import org.apache.commons.io.FileUtils; import org.junit.AfterClass; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; import org.apache.cassandra.config.DatabaseDescriptor; @@ -106,7 +105,6 @@ public class SSTableLoaderEncryptionOptionsTest extends AbstractEncryptionOption assertRows(CLUSTER.get(1).executeInternal("SELECT count(*) FROM ssl_upload_tables.test"), row(42L)); } - @Ignore @Test public void bulkLoaderSuccessfullyStreamsOverSslWithDeprecatedSslStoragePort() throws Throwable { diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/FetchLogFromPeers2Test.java b/test/distributed/org/apache/cassandra/distributed/test/log/FetchLogFromPeers2Test.java index d42c379683..78c67c9e15 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/log/FetchLogFromPeers2Test.java +++ b/test/distributed/org/apache/cassandra/distributed/test/log/FetchLogFromPeers2Test.java @@ -90,7 +90,9 @@ public class FetchLogFromPeers2Test extends TestBaseImpl cluster.coordinator(coordinator).execute(withKeyspace(query), ConsistencyLevel.QUORUM); fail("should fail"); } - catch (Exception ignored) {} + catch (Exception ignored) + { + } boolean metricBumped = false; for (int i = 1; i <= cluster.size(); i++) diff --git a/test/distributed/org/apache/cassandra/distributed/test/tcm/SplitBrainTest.java b/test/distributed/org/apache/cassandra/distributed/test/tcm/SplitBrainTest.java index 9e35a71d83..1382f8b063 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/tcm/SplitBrainTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/tcm/SplitBrainTest.java @@ -30,7 +30,6 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import org.junit.Assert; -import org.junit.Ignore; import org.junit.Test; import org.apache.cassandra.distributed.Cluster; @@ -57,7 +56,6 @@ import static org.junit.Assert.assertTrue; public class SplitBrainTest extends TestBaseImpl { - @Ignore @Test public void testSplitBrainStartup() throws IOException, TimeoutException { diff --git a/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java b/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java index 14d1587c5b..0c0f43dc2d 100644 --- a/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java +++ b/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java @@ -40,11 +40,17 @@ import accord.impl.TopologyFactory; import accord.impl.basic.Cluster; import accord.impl.basic.RandomDelayQueue; import accord.local.CommandStores; +import accord.local.DurableBefore; import accord.local.Node; +import accord.local.RedundantBefore; import accord.primitives.EpochSupplier; import accord.utils.DefaultRandom; import accord.utils.Invariants; +import accord.utils.PersistentField; import accord.utils.RandomSource; +import accord.utils.async.AsyncChains; +import accord.utils.async.AsyncResult; +import accord.utils.async.AsyncResults; import org.apache.cassandra.ServerTestUtils; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Directories; @@ -308,6 +314,13 @@ public class AccordJournalBurnTest extends BurnTestBase this.closeCurrentSegmentForTestingIfNonEmpty(); super.replay(commandStores); } + + @Override + public PersistentField.Persister<DurableBefore, DurableBefore> durableBeforePersister() + { + // TODO (required): we should be persisting in the journal, but this currently causes the burn test to take far too long + return DurableBefore.NOOP_PERSISTER; + } }; return journal; @@ -330,9 +343,15 @@ public class AccordJournalBurnTest extends BurnTestBase { IAccordService.AccordCompactionInfos compactionInfos = new IAccordService.AccordCompactionInfos(node.durableBefore(), node.topology().minEpoch()); node.commandStores().forEachCommandStore(commandStore -> { + RedundantBefore redundantBefore = commandStore.unsafeGetRedundantBefore(); + if (redundantBefore == null) + redundantBefore = RedundantBefore.EMPTY; + CommandStores.RangesForEpoch rangesForEpoch = commandStore.unsafeGetRangesForEpoch(); + if (rangesForEpoch == null) + rangesForEpoch = CommandStores.RangesForEpoch.EMPTY; compactionInfos.put(commandStore.id(), new IAccordService.AccordCompactionInfo(commandStore.id(), - commandStore.unsafeGetRedundantBefore(), - commandStore.unsafeGetRangesForEpoch(), + redundantBefore, + rangesForEpoch, tableId)); }); return compactionInfos; diff --git a/test/unit/org/apache/cassandra/ServerTestUtils.java b/test/unit/org/apache/cassandra/ServerTestUtils.java index a9f53a340e..702e2b6871 100644 --- a/test/unit/org/apache/cassandra/ServerTestUtils.java +++ b/test/unit/org/apache/cassandra/ServerTestUtils.java @@ -238,7 +238,7 @@ public final class ServerTestUtils { if (directory.exists()) { - Arrays.stream(directory.tryList()).forEach(File::deleteRecursive); + Arrays.stream(directory.tryList()).forEach(File::tryDeleteRecursive); } } diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java index 08b9df49f1..f23e16788e 100644 --- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java @@ -33,7 +33,6 @@ import org.apache.cassandra.io.util.FileReader; import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; import org.apache.cassandra.config.DatabaseDescriptor; @@ -110,7 +109,6 @@ public class CommitLogSegmentManagerCDCTest extends CQLTester testWithNonblockingMode(this::testSegmentFlaggingOnCreation0); } - @Ignore @Test public void testNonblockingShouldMaintainSteadyDiskUsage() throws Throwable { diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitlogShutdownTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitlogShutdownTest.java index becc9390dd..e3e5662112 100644 --- a/test/unit/org/apache/cassandra/db/commitlog/CommitlogShutdownTest.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitlogShutdownTest.java @@ -27,7 +27,6 @@ import java.util.Random; import com.google.common.collect.ImmutableMap; import org.junit.Assert; -import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; @@ -58,7 +57,6 @@ public class CommitlogShutdownTest private final static byte[] entropy = new byte[1024 * 256]; - @Ignore @Test @BMRule(name = "Make removing commitlog segments slow", targetClass = "CommitLogSegment", diff --git a/test/unit/org/apache/cassandra/hints/HintServiceBytemanTest.java b/test/unit/org/apache/cassandra/hints/HintServiceBytemanTest.java index e44f8dd973..1beaa123a8 100644 --- a/test/unit/org/apache/cassandra/hints/HintServiceBytemanTest.java +++ b/test/unit/org/apache/cassandra/hints/HintServiceBytemanTest.java @@ -26,7 +26,6 @@ import java.util.concurrent.ExecutionException; import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; @@ -92,7 +91,6 @@ public class HintServiceBytemanTest HintsService.instance.startDispatch(); } - @Ignore @Test @BMRule(name = "Delay delivering hints", targetClass = "DispatchHintsTask", diff --git a/test/unit/org/apache/cassandra/io/util/SafeMemoryWriterTest.java b/test/unit/org/apache/cassandra/io/util/SafeMemoryWriterTest.java index fd5075a9cd..8b37c2def2 100644 --- a/test/unit/org/apache/cassandra/io/util/SafeMemoryWriterTest.java +++ b/test/unit/org/apache/cassandra/io/util/SafeMemoryWriterTest.java @@ -22,7 +22,6 @@ import java.util.Arrays; import java.util.Random; import org.junit.Assert; -import org.junit.Ignore; import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -60,7 +59,6 @@ public class SafeMemoryWriterTest testSafeMemoryWriter(CHUNK * 5, CHUNK, 65536); } - @Ignore @Test public void testOver2GBuffer() throws IOException { diff --git a/test/unit/org/apache/cassandra/service/accord/serializers/CheckStatusSerializersTest.java b/test/unit/org/apache/cassandra/service/accord/serializers/CheckStatusSerializersTest.java index 7079de790e..5a8ac1505b 100644 --- a/test/unit/org/apache/cassandra/service/accord/serializers/CheckStatusSerializersTest.java +++ b/test/unit/org/apache/cassandra/service/accord/serializers/CheckStatusSerializersTest.java @@ -69,12 +69,12 @@ public class CheckStatusSerializersTest switch (domain) { case Key: - // TODO (coverage): don't hard code murmur + // TODO (desired): don't hard code murmur Gen<TokenKey> keyGen = AccordGenerators.routingKeyGen(fromQT(CassandraGenerators.TABLE_ID_GEN), Gens.constant(AccordGenerators.RoutingKeyKind.TOKEN), fromQT(CassandraGenerators.murmurToken()), Murmur3Partitioner.instance); TokenKey homeKey = keyGen.next(rs); List<TokenKey> forOrdering = Gens.lists(keyGen).unique().ofSizeBetween(1, 10).next(rs); forOrdering.sort(Comparator.naturalOrder()); - // TODO (coverage): don't hard code keys type + // TODO (desired): don't hard code keys type keysOrRanges = new FullKeyRoute(homeKey, forOrdering.toArray(RoutingKey[]::new)); break; case Range: diff --git a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java index 90241b7475..dcea11348d 100644 --- a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java +++ b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java @@ -479,7 +479,6 @@ public class CommandsForKeySerializerTest } } - // TODO (expected): we currently don't explore TruncatedApply statuses because we don't transition through all phases and therefore don't adopt the Applied status Choices<SaveStatus> saveStatusChoices = Choices.uniform(EnumSet.complementOf(EnumSet.of(SaveStatus.TruncatedApply, SaveStatus.TruncatedUnapplied, SaveStatus.TruncatedApplyWithOutcome)).toArray(SaveStatus[]::new)); Supplier<SaveStatus> saveStatusSupplier = () -> { SaveStatus result = saveStatusChoices.choose(source); diff --git a/test/unit/org/apache/cassandra/tcm/DiscoverySimulationTest.java b/test/unit/org/apache/cassandra/tcm/DiscoverySimulationTest.java index eae6ea2f62..6021355442 100644 --- a/test/unit/org/apache/cassandra/tcm/DiscoverySimulationTest.java +++ b/test/unit/org/apache/cassandra/tcm/DiscoverySimulationTest.java @@ -32,7 +32,6 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang3.NotImplementedException; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; import org.apache.cassandra.config.DatabaseDescriptor; @@ -78,7 +77,6 @@ public class DiscoverySimulationTest log.readyUnchecked(); } - @Ignore @Test public void discoveryTest() throws Throwable { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org