This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 13a3ae9297 Fix:  - DurabilityQueue/ShardScheduler deadlock  - 
MemtableCleanerThread.Cleanup assumes Boolean parameter is non-null, which is 
invalid if an exception has been thrown  - AccordDurableOnFlush may be invoked 
while Accord is starting up, so should use AccordService.unsafeInstance  - 
AccordCache shrink without lock regression  - Cleanup system_accord compaction 
leftovers before starting up  - system_accord_debug.txn order  - 
system_accord_debug.txn_blocked_by order  - sy [...]
13a3ae9297 is described below

commit 13a3ae92976ae0724b0d20cca12b80c0174fb4f7
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Fri Nov 28 15:20:28 2025 +0000

    Fix:
     - DurabilityQueue/ShardScheduler deadlock
     - MemtableCleanerThread.Cleanup assumes Boolean parameter is non-null, 
which is invalid if an exception has been thrown
     - AccordDurableOnFlush may be invoked while Accord is starting up, so 
should use AccordService.unsafeInstance
     - AccordCache shrink without lock regression
     - Cleanup system_accord compaction leftovers before starting up
     - system_accord_debug.txn order
     - system_accord_debug.txn_blocked_by order
     - system_accord_debug.shard_epochs order
    Improve:
     - Set DefaultProgressLog.setMode(Catchup) during Catchup
     - IdentityAccumulators only need to readLast, not readAll
     - Limit number of static segments we compact at once to sstable
     - If too many static segments on startup, wait for them to be compacted
    
    patch by Benedict; reviewed by Alex Petrov for CASSANDRA-21053
---
 modules/accord                                     |  2 +-
 .../org/apache/cassandra/config/AccordSpec.java    |  7 ++
 .../cassandra/db/virtual/AccordDebugKeyspace.java  | 19 +++--
 .../org/apache/cassandra/journal/Compactor.java    | 17 ++++-
 src/java/org/apache/cassandra/journal/Journal.java | 21 ++++++
 src/java/org/apache/cassandra/journal/Params.java  |  5 ++
 .../org/apache/cassandra/journal/Segments.java     | 11 +++
 .../apache/cassandra/service/StartupChecks.java    |  6 ++
 .../cassandra/service/accord/AccordCache.java      |  3 +-
 .../cassandra/service/accord/AccordCacheEntry.java |  7 +-
 .../service/accord/AccordDurableOnFlush.java       |  2 +-
 .../cassandra/service/accord/AccordJournal.java    | 21 ++++--
 .../service/accord/AccordJournalTable.java         | 38 +++++++++-
 .../cassandra/service/accord/AccordService.java    | 86 ++++++++++++----------
 .../cassandra/service/accord/DebugBlockedTxns.java |  6 ++
 src/java/org/apache/cassandra/tcm/Startup.java     |  2 +-
 .../utils/memory/MemtableCleanerThread.java        |  2 +-
 .../db/virtual/AccordDebugKeyspaceTest.java        | 51 +++++++++----
 .../org/apache/cassandra/journal/TestParams.java   |  6 ++
 19 files changed, 236 insertions(+), 76 deletions(-)

diff --git a/modules/accord b/modules/accord
index 973335bfe7..8ccce74581 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit 973335bfe7e930c6646016a4a48cff084f49b660
+Subproject commit 8ccce745818cf80c7cff82c3554e4a88e9e540db
diff --git a/src/java/org/apache/cassandra/config/AccordSpec.java 
b/src/java/org/apache/cassandra/config/AccordSpec.java
index 1e2379a564..74fa27580b 100644
--- a/src/java/org/apache/cassandra/config/AccordSpec.java
+++ b/src/java/org/apache/cassandra/config/AccordSpec.java
@@ -197,6 +197,7 @@ public class AccordSpec
     public static class JournalSpec implements Params
     {
         public int segmentSize = 32 << 20;
+        public int compactMaxSegments = 32;
         public FailurePolicy failurePolicy = FailurePolicy.STOP;
         public ReplayMode replayMode = ReplayMode.ONLY_NON_DURABLE;
         public FlushMode flushMode = FlushMode.PERIODIC;
@@ -225,6 +226,12 @@ public class AccordSpec
             return segmentSize;
         }
 
+        @Override
+        public int compactMaxSegments()
+        {
+            return compactMaxSegments;
+        }
+
         @Override
         public FailurePolicy failurePolicy()
         {
diff --git a/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java 
b/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java
index c0c4c82f29..de989d7f6e 100644
--- a/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java
+++ b/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java
@@ -146,6 +146,7 @@ import 
org.apache.cassandra.service.consensus.migration.TableMigrationState;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.membership.NodeId;
 import org.apache.cassandra.utils.LocalizeString;
+import org.apache.cassandra.utils.concurrent.Future;
 
 import static accord.coordinate.Infer.InvalidIf.NotKnownToBeInvalid;
 import static accord.local.RedundantStatus.Property.GC_BEFORE;
@@ -169,6 +170,7 @@ import static 
org.apache.cassandra.db.virtual.VirtualTable.Sorted.ASC;
 import static org.apache.cassandra.db.virtual.VirtualTable.Sorted.SORTED;
 import static org.apache.cassandra.db.virtual.VirtualTable.Sorted.UNSORTED;
 import static org.apache.cassandra.schema.SchemaConstants.VIRTUAL_ACCORD_DEBUG;
+import static org.apache.cassandra.service.accord.AccordService.toFuture;
 import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime;
 
 // TODO (expected): split into separate classes in own package
@@ -713,7 +715,9 @@ public class AccordDebugKeyspace extends VirtualKeyspace
             collector.partition(commandStore.id())
                      .collect(rows -> {
                          // TODO (desired): support maybe execute immediately 
with safeStore
-                         
AccordService.getBlocking(commandStore.chain((PreLoadContext.Empty) 
metadata::toString, safeStore -> { addRows(safeStore, rows); }));
+                         Future<?> future = 
toFuture(commandStore.chain((PreLoadContext.Empty) metadata::toString, 
safeStore -> { addRows(safeStore, rows); }));
+                         if 
(!future.awaitUntilThrowUncheckedOnInterrupt(collector.deadlineNanos()))
+                             throw new InternalTimeoutException();
                      });
         }
 
@@ -1601,7 +1605,7 @@ public class AccordDebugKeyspace extends VirtualKeyspace
                         "Accord per-CommandStore Transaction State",
                         "CREATE TABLE %s (\n" +
                         "  command_store_id int,\n" +
-                        "  txn_id text,\n" +
+                        "  txn_id 'TxnIdUtf8Type',\n" +
                         "  save_status text,\n" +
                         "  route text,\n" +
                         "  durability text,\n" +
@@ -1792,7 +1796,7 @@ public class AccordDebugKeyspace extends VirtualKeyspace
                 case TRY_EXECUTE:
                     run(txnId, commandStoreId, safeStore -> {
                         SafeCommand safeCommand = safeStore.unsafeGet(txnId);
-                        Commands.maybeExecute(safeStore, safeCommand, 
safeCommand.current(), true, true, NotifyWaitingOnPlus.adapter(null, true, 
true));
+                        Commands.maybeExecute(safeStore, safeCommand, 
safeCommand.current(), true, true, NotifyWaitingOnPlus.adapter(ignore -> {}, 
true, true));
                         return AsyncChains.success(null);
                     });
                     break;
@@ -2089,7 +2093,7 @@ public class AccordDebugKeyspace extends VirtualKeyspace
                         "  blocked_by_txn_id 'TxnIdUtf8Type',\n" +
                         "  save_status text,\n" +
                         "  execute_at text,\n" +
-                        "  PRIMARY KEY (txn_id, command_store_id, depth, 
blocked_by_key, blocked_by_txn_id)" +
+                        "  PRIMARY KEY (txn_id, depth, command_store_id, 
blocked_by_txn_id, blocked_by_key)" +
                         ')', TxnIdUtf8Type.instance), BEST_EFFORT, ASC);
         }
 
@@ -2111,7 +2115,7 @@ public class AccordDebugKeyspace extends VirtualKeyspace
                     DebugBlockedTxns.visit(AccordService.unsafeInstance(), 
txnId, maxDepth, collector.deadlineNanos(), txn -> {
                         String keyStr = txn.blockedViaKey == null ? "" : 
txn.blockedViaKey.toString();
                         String txnIdStr = txn.txnId == null || 
txn.txnId.equals(txnId) ? "" : txn.txnId.toString();
-                        rows.add(txn.commandStoreId, txn.depth, keyStr, 
txnIdStr)
+                        rows.add(txn.depth, txn.commandStoreId, txnIdStr, 
keyStr)
                             .eagerCollect(columns -> {
                                 columns.add("save_status", txn.saveStatus, 
TO_STRING)
                                        .add("execute_at", txn.executeAt, 
TO_STRING);
@@ -2160,8 +2164,9 @@ public class AccordDebugKeyspace extends VirtualKeyspace
                         "  quorum_fast_privileged_deps int,\n" +
                         "  quorum_fast_privileged_nodeps int,\n" +
                         "  token_end 'TokenUtf8Type',\n" +
-                        "  PRIMARY KEY (table_id, token_start, epoch_start)" +
-                        ')', UTF8Type.instance), FAIL, ASC);
+                        "  PRIMARY KEY (table_id, token_start, epoch_start))" +
+                        "  WITH CLUSTERING ORDER BY (token_start ASC, 
epoch_start DESC);"
+                        , UTF8Type.instance), FAIL, ASC);
         }
 
         @Override
diff --git a/src/java/org/apache/cassandra/journal/Compactor.java 
b/src/java/org/apache/cassandra/journal/Compactor.java
index 7a1b19b088..b6749bc390 100644
--- a/src/java/org/apache/cassandra/journal/Compactor.java
+++ b/src/java/org/apache/cassandra/journal/Compactor.java
@@ -18,9 +18,9 @@
 package org.apache.cassandra.journal;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.List;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.ScheduledExecutorPlus;
 import org.apache.cassandra.concurrent.Shutdownable;
+import org.apache.cassandra.utils.concurrent.WaitQueue;
 
 import static 
org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
 
@@ -40,6 +41,7 @@ public final class Compactor<K, V> implements Runnable, 
Shutdownable
     private final SegmentCompactor<K, V> segmentCompactor;
     private final ScheduledExecutorPlus executor;
     private Future<?> scheduled;
+    public final WaitQueue compacted = WaitQueue.newWaitQueue();
 
     Compactor(Journal<K, V> journal, SegmentCompactor<K, V> segmentCompactor)
     {
@@ -73,11 +75,18 @@ public final class Compactor<K, V> implements Runnable, 
Shutdownable
     @Override
     public void run()
     {
-        Set<StaticSegment<K, V>> toCompact = new HashSet<>();
+        List<StaticSegment<K, V>> toCompact = new ArrayList<>();
         journal.segments().selectStatic(toCompact);
         if (toCompact.isEmpty())
             return;
 
+        int limit = journal.params.compactMaxSegments();
+        if (toCompact.size() > limit)
+        {
+            toCompact.sort(StaticSegment::compareTo);
+            toCompact.subList(limit, toCompact.size()).clear();
+        }
+
         try
         {
             Collection<StaticSegment<K, V>> newSegments = 
segmentCompactor.compact(toCompact);
@@ -88,6 +97,8 @@ public final class Compactor<K, V> implements Runnable, 
Shutdownable
             journal.replaceCompactedSegments(toCompact, newSegments);
             for (StaticSegment<K, V> segment : toCompact)
                 segment.discard(journal);
+
+            compacted.signalAll();
         }
         catch (IOException e)
         {
diff --git a/src/java/org/apache/cassandra/journal/Journal.java 
b/src/java/org/apache/cassandra/journal/Journal.java
index 3120eb0916..ecfca32a52 100644
--- a/src/java/org/apache/cassandra/journal/Journal.java
+++ b/src/java/org/apache/cassandra/journal/Journal.java
@@ -222,6 +222,27 @@ public class Journal<K, V> implements Shutdownable
                               "Unexpected journal state after initialization", 
state);
         flusher.start();
         compactor.start();
+
+        final int maxSegments = 100;
+        if (segments.get().count(Segment::isStatic) > maxSegments)
+        {
+            while (true)
+            {
+                WaitQueue.Signal signal = compactor.compacted.register();
+                int count = segments.get().count(Segment::isStatic);
+                if (count <= maxSegments)
+                {
+                    signal.cancel();
+                    logger.info("Only {} static segments; continuing with 
startup", count);
+                    break;
+                }
+                else
+                {
+                    logger.info("Too many ({}) static segments; waiting until 
some compacted before starting up", count);
+                    signal.awaitThrowUncheckedOnInterrupt();
+                }
+            }
+        }
     }
 
     @VisibleForTesting
diff --git a/src/java/org/apache/cassandra/journal/Params.java 
b/src/java/org/apache/cassandra/journal/Params.java
index 1e898ebce4..161165177d 100644
--- a/src/java/org/apache/cassandra/journal/Params.java
+++ b/src/java/org/apache/cassandra/journal/Params.java
@@ -31,6 +31,11 @@ public interface Params
      */
     int segmentSize();
 
+    /**
+     * @return maximum number of static segments to compact at once to sstable
+     */
+    int compactMaxSegments();
+
     /**
      * @return this journal's {@link FailurePolicy}
      */
diff --git a/src/java/org/apache/cassandra/journal/Segments.java 
b/src/java/org/apache/cassandra/journal/Segments.java
index bdd447ec6b..7245029fea 100644
--- a/src/java/org/apache/cassandra/journal/Segments.java
+++ b/src/java/org/apache/cassandra/journal/Segments.java
@@ -102,6 +102,17 @@ class Segments<K, V>
         return this.segments.values();
     }
 
+    public int count(Predicate<? super Segment<K, V>> predicate)
+    {
+        int count = 0;
+        for (Segment<K, V> segment : segments.values())
+        {
+            if (predicate.test(segment))
+                ++count;
+        }
+        return count;
+    }
+
     /**
      * Returns segments in timestamp order. Will allocate and sort the segment 
collection.
      */
diff --git a/src/java/org/apache/cassandra/service/StartupChecks.java 
b/src/java/org/apache/cassandra/service/StartupChecks.java
index 4ec3019465..11a27d7fe8 100644
--- a/src/java/org/apache/cassandra/service/StartupChecks.java
+++ b/src/java/org/apache/cassandra/service/StartupChecks.java
@@ -725,6 +725,12 @@ public class StartupChecks
             for (TableMetadata cfm : 
Schema.instance.getTablesAndViews(SchemaConstants.SYSTEM_KEYSPACE_NAME))
                 ColumnFamilyStore.scrubDataDirectories(cfm);
 
+            if (DatabaseDescriptor.getAccordTransactionsEnabled())
+            {
+                for (TableMetadata cfm : 
Schema.instance.getTablesAndViews(SchemaConstants.ACCORD_KEYSPACE_NAME))
+                    ColumnFamilyStore.scrubDataDirectories(cfm);
+            }
+
             try
             {
                 SystemKeyspace.checkHealth();
diff --git a/src/java/org/apache/cassandra/service/accord/AccordCache.java 
b/src/java/org/apache/cassandra/service/accord/AccordCache.java
index fc794f6dec..6c6eb167b8 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCache.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCache.java
@@ -259,8 +259,7 @@ public class AccordCache implements CacheSize
                 {
                     //noinspection LockAcquiredButNotSafelyReleased
                     lock.lock();
-                    node.tryApplyShrink(cur, upd);
-                    queue.addLast(node);
+                    node.tryApplyShrink(cur, upd, queue);
                 }
             }
         }
diff --git a/src/java/org/apache/cassandra/service/accord/AccordCacheEntry.java 
b/src/java/org/apache/cassandra/service/accord/AccordCacheEntry.java
index 02ee11c3db..26f46334d6 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCacheEntry.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCacheEntry.java
@@ -30,6 +30,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.primitives.Ints;
 
 import accord.utils.ArrayBuffers.BufferList;
+import accord.utils.IntrusiveLinkedList;
 import accord.utils.IntrusiveLinkedListNode;
 import accord.utils.Invariants;
 import accord.utils.async.Cancellable;
@@ -595,10 +596,14 @@ public class AccordCacheEntry<K, V> extends 
IntrusiveLinkedListNode
         return ((FailedToSave)state).cause;
     }
 
-    void tryApplyShrink(Object cur, Object upd)
+    void tryApplyShrink(Object cur, Object upd, 
IntrusiveLinkedList<AccordCacheEntry<?,?>> queue)
     {
+        if (references() > 0 || !isUnqueued())
+            return;
+
         if (isLoaded() && unwrap() == cur && upd != cur && upd != null)
             applyShrink(owner.parent(), cur, upd);
+        queue.addLast(this);
     }
 
     private void applyShrink(AccordCache.Type<K, V, ?> parent, Object cur, 
Object upd)
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordDurableOnFlush.java 
b/src/java/org/apache/cassandra/service/accord/AccordDurableOnFlush.java
index 96cf75d0cd..7f1bb70a04 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordDurableOnFlush.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordDurableOnFlush.java
@@ -58,7 +58,7 @@ class AccordDurableOnFlush implements Consumer<TableMetadata>
             notify = commandStores;
             commandStores = null;
         }
-        CommandStores commandStores = 
AccordService.instance().node().commandStores();
+        CommandStores commandStores = 
AccordService.unsafeInstance().node().commandStores();
         for (Map.Entry<Integer, RedundantBefore> e : notify.entrySet())
         {
             RedundantBefore durable = e.getValue();
diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java 
b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
index 92c3629abe..885bbfa961 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
@@ -189,8 +189,9 @@ public class AccordJournal implements accord.api.Journal, 
RangeSearcher.Supplier
         Invariants.require(status == Status.INITIALIZED);
         this.node = node;
         status = Status.STARTING;
-        journal.start();
+        // start table first to scrub directories before compactor starts
         journalTable.start();
+        journal.start();
     }
 
     public boolean started()
@@ -320,28 +321,28 @@ public class AccordJournal implements accord.api.Journal, 
RangeSearcher.Supplier
     @Override
     public RedundantBefore loadRedundantBefore(int commandStoreId)
     {
-        IdentityAccumulator<RedundantBefore> accumulator = readAll(new 
JournalKey(TxnId.NONE, JournalKey.Type.REDUNDANT_BEFORE, commandStoreId));
+        IdentityAccumulator<RedundantBefore> accumulator = readLast(new 
JournalKey(TxnId.NONE, JournalKey.Type.REDUNDANT_BEFORE, commandStoreId));
         return accumulator.get();
     }
 
     @Override
     public NavigableMap<TxnId, Ranges> loadBootstrapBeganAt(int commandStoreId)
     {
-        IdentityAccumulator<NavigableMap<TxnId, Ranges>> accumulator = 
readAll(new JournalKey(TxnId.NONE, JournalKey.Type.BOOTSTRAP_BEGAN_AT, 
commandStoreId));
+        IdentityAccumulator<NavigableMap<TxnId, Ranges>> accumulator = 
readLast(new JournalKey(TxnId.NONE, JournalKey.Type.BOOTSTRAP_BEGAN_AT, 
commandStoreId));
         return accumulator.get();
     }
 
     @Override
     public NavigableMap<Timestamp, Ranges> loadSafeToRead(int commandStoreId)
     {
-        IdentityAccumulator<NavigableMap<Timestamp, Ranges>> accumulator = 
readAll(new JournalKey(TxnId.NONE, JournalKey.Type.SAFE_TO_READ, 
commandStoreId));
+        IdentityAccumulator<NavigableMap<Timestamp, Ranges>> accumulator = 
readLast(new JournalKey(TxnId.NONE, JournalKey.Type.SAFE_TO_READ, 
commandStoreId));
         return accumulator.get();
     }
 
     @Override
     public CommandStores.RangesForEpoch loadRangesForEpoch(int commandStoreId)
     {
-        IdentityAccumulator<RangesForEpoch> accumulator = readAll(new 
JournalKey(TxnId.NONE, JournalKey.Type.RANGES_FOR_EPOCH, commandStoreId));
+        IdentityAccumulator<RangesForEpoch> accumulator = readLast(new 
JournalKey(TxnId.NONE, JournalKey.Type.RANGES_FOR_EPOCH, commandStoreId));
         return accumulator.get();
     }
 
@@ -520,6 +521,16 @@ public class AccordJournal implements accord.api.Journal, 
RangeSearcher.Supplier
         return builder;
     }
 
+    public <BUILDER extends FlyweightImage> BUILDER readLast(JournalKey key)
+    {
+        BUILDER builder = (BUILDER) key.type.serializer.mergerFor();
+        builder.reset(key);
+        // TODO (expected): this can be further improved to avoid allocating 
lambdas
+        AccordJournalValueSerializers.FlyweightSerializer<?, BUILDER> 
serializer = (AccordJournalValueSerializers.FlyweightSerializer<?, BUILDER>) 
key.type.serializer;
+        journalTable.readLast(key, (in, userVersion) -> 
serializer.deserialize(key, builder, in, userVersion));
+        return builder;
+    }
+
     public void forEachEntry(JournalKey key, AccordJournalTable.Reader reader)
     {
         journalTable.readAll(key, reader);
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java 
b/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java
index 17aff49f86..40649cc432 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java
@@ -365,7 +365,43 @@ public class AccordJournalTable<K extends JournalKey, V> 
implements RangeSearche
             }
         }
     }
-    
+
+    public void readLast(K key, Reader reader)
+    {
+        readLast(key, new RecordConsumerAdapter<>(reader));
+    }
+
+    public void readLast(K key, RecordConsumer<K> reader)
+    {
+        try (TableKeyIterator table = readAllFromTable(key))
+        {
+            boolean hasTableData = table.advance();
+            long minSegment = hasTableData ? table.segment : Long.MIN_VALUE;
+
+            class JournalReader implements RecordConsumer<K>
+            {
+                boolean read;
+                @Override
+                public void accept(long segment, int position, K key, 
ByteBuffer buffer, int userVersion)
+                {
+                    if (segment > minSegment)
+                    {
+                        reader.accept(segment, position, key, buffer, 
userVersion);
+                        read = true;
+                    }
+                }
+            }
+
+            // First, read all journal entries newer than anything flushed 
into sstables
+            JournalReader journalReader = new JournalReader();
+            journal.readLast(key, journalReader);
+
+            // Then, read SSTables, if we haven't found a record already
+            if (hasTableData && !journalReader.read)
+                reader.accept(table.segment, table.offset, key, table.value, 
table.userVersion);
+        }
+    }
+
     // TODO (expected): why are recordColumn and versionColumn instance 
fields, so that this cannot be a static class?
     class TableKeyIterator implements Closeable, RecordConsumer<K>
     {
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java 
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index cff5f26589..e7e5762fb1 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -38,6 +38,7 @@ import javax.annotation.concurrent.GuardedBy;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.primitives.Ints;
 
+import accord.impl.progresslog.DefaultProgressLog;
 import accord.local.Catchup;
 import accord.topology.ActiveEpochs;
 import accord.topology.EpochReady;
@@ -131,6 +132,7 @@ import 
org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
 
 import static accord.api.Journal.TopologyUpdate;
 import static 
accord.api.ProtocolModifiers.Toggles.FastExec.MAY_BYPASS_SAFESTORE;
+import static accord.impl.progresslog.DefaultProgressLog.ModeFlag.CATCH_UP;
 import static accord.local.durability.DurabilityService.SyncLocal.Self;
 import static accord.local.durability.DurabilityService.SyncRemote.All;
 import static accord.messages.SimpleReply.Ok;
@@ -475,67 +477,75 @@ public class AccordService implements IAccordService, 
Shutdownable
         AccordSpec spec = DatabaseDescriptor.getAccord();
         if (!spec.catchup_on_start)
         {
-            logger.info("Not catching up with peers");
+            logger.info("Catchup disabled; continuing to startup");
             return;
         }
 
         BootstrapState bootstrapState = SystemKeyspace.getBootstrapState();
         if (bootstrapState == COMPLETED)
         {
-            long maxLatencyNanos = 
spec.catchup_on_start_fail_latency.toNanoseconds();
-            int attempts = 1;
-            while (true)
+            node.commandStores().forAllUnsafe(commandStore -> 
((DefaultProgressLog)commandStore.unsafeProgressLog()).setMode(CATCH_UP));
+            try
             {
-                logger.info("Catching up with quorum...");
-                long start = nanoTime();
-                long failAt = start + maxLatencyNanos;
-                Future<Void> f = toFuture(Catchup.catchup(node));
-                if (!f.awaitUntilThrowUncheckedOnInterrupt(failAt))
+                long maxLatencyNanos = 
spec.catchup_on_start_fail_latency.toNanoseconds();
+                int attempts = 1;
+                while (true)
                 {
-                    if (spec.catchup_on_start_exit_on_failure)
+                    logger.info("Catchup with quorum...");
+                    long start = nanoTime();
+                    long failAt = start + maxLatencyNanos;
+                    Future<Void> f = toFuture(Catchup.catchup(node));
+                    if (!f.awaitUntilThrowUncheckedOnInterrupt(failAt))
                     {
-                        logger.error("Catch up exceeded maximum latency of 
{}ns; shutting down", maxLatencyNanos);
-                        throw new RuntimeException("Could not catch up with 
peers");
+                        if (spec.catchup_on_start_exit_on_failure)
+                        {
+                            logger.error("Catchup exceeded maximum latency of 
{}ns; shutting down", maxLatencyNanos);
+                            throw new RuntimeException("Could not catchup with 
peers");
+                        }
+                        logger.error("Catchup exceeded maximum latency of 
{}ns; continuing to startup", maxLatencyNanos);
+                        break;
                     }
-                    logger.error("Catch up exceeded maximum latency of {}ns; 
starting up", maxLatencyNanos);
-                    break;
-                }
 
-                Throwable failed = f.cause();
-                if (failed != null)
-                {
-                    if (spec.catchup_on_start_exit_on_failure)
-                        throw new RuntimeException("Could not catch up with 
peers", failed);
+                    Throwable failed = f.cause();
+                    if (failed != null)
+                    {
+                        if (spec.catchup_on_start_exit_on_failure)
+                            throw new RuntimeException("Could not catchup with 
peers", failed);
 
-                    logger.error("Could not catch up with peers; continuing to 
startup");
-                    break;
-                }
+                        logger.error("Could not catchup with peers; continuing 
to startup");
+                        break;
+                    }
 
-                long end = nanoTime();
-                double seconds = NANOSECONDS.toMillis(end - start)/1000.0;
-                logger.info("Finished catching up with all quorums. {}s 
elapsed.", String.format("%.2f", seconds));
+                    long end = nanoTime();
+                    double seconds = NANOSECONDS.toMillis(end - start)/1000.0;
+                    logger.info("Finished catchup with all quorums. {}s 
elapsed.", String.format("%.2f", seconds));
 
-                if (seconds <= 
spec.catchup_on_start_success_latency.toSeconds())
-                    break;
+                    if (seconds <= 
spec.catchup_on_start_success_latency.toSeconds())
+                        break;
 
-                if (++attempts > spec.catchup_on_start_max_attempts)
-                {
-                    if (spec.catchup_on_start_exit_on_failure)
+                    if (++attempts > spec.catchup_on_start_max_attempts)
                     {
-                        logger.error("Catch up was slow, aborting after {} 
attempts and shutting down", attempts);
-                        throw new RuntimeException("Could not catch up with 
peers");
+                        if (spec.catchup_on_start_exit_on_failure)
+                        {
+                            logger.error("Catchup was slow, aborting after {} 
attempts and shutting down", attempts);
+                            throw new RuntimeException("Could not catchup with 
peers");
+                        }
+
+                        logger.info("Catchup was slow; continuing to startup 
after {} attempts.", attempts - 1);
+                        break;
                     }
 
-                    logger.info("Catch up was slow; continuing to startup 
after {} attempts.", attempts - 1);
-                    break;
+                    logger.info("Catchup was slow, so we may behind again; 
retrying");
                 }
-
-                logger.info("Catch up was slow, so we may behind again; 
retrying");
+            }
+            finally
+            {
+                node.commandStores().forAllUnsafe(commandStore -> 
((DefaultProgressLog)commandStore.unsafeProgressLog()).unsetMode(CATCH_UP));
             }
         }
         else
         {
-            logger.info("Not catching up with quorum, as bootstrap state is 
{}", bootstrapState);
+            logger.info("No catchup, as bootstrap state is {}", 
bootstrapState);
         }
     }
 
diff --git a/src/java/org/apache/cassandra/service/accord/DebugBlockedTxns.java 
b/src/java/org/apache/cassandra/service/accord/DebugBlockedTxns.java
index a4ca0af5e1..7756c712b4 100644
--- a/src/java/org/apache/cassandra/service/accord/DebugBlockedTxns.java
+++ b/src/java/org/apache/cassandra/service/accord/DebugBlockedTxns.java
@@ -96,8 +96,14 @@ public class DebugBlockedTxns
             int c = Integer.compare(this.commandStoreId, that.commandStoreId);
             if (c == 0) c = Integer.compare(this.depth, that.depth);
             if (c == 0) c = this.txnId.compareTo(that.txnId);
+            if (c == 0) c = 
this.blockedViaKeyString().compareTo(that.blockedViaKeyString());
             return c;
         }
+
+        private String blockedViaKeyString()
+        {
+            return blockedViaKey == null ? "" : blockedViaKey.toString();
+        }
     }
 
     final IAccordService service;
diff --git a/src/java/org/apache/cassandra/tcm/Startup.java 
b/src/java/org/apache/cassandra/tcm/Startup.java
index 53a0f6095a..1589e8db17 100644
--- a/src/java/org/apache/cassandra/tcm/Startup.java
+++ b/src/java/org/apache/cassandra/tcm/Startup.java
@@ -189,7 +189,7 @@ import static 
org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
         for (KeyspaceMetadata keyspace : metadata.schema.getKeyspaces())
         {
             // Skip system as we've already cleaned it
-            if (keyspace.name.equals(SchemaConstants.SYSTEM_KEYSPACE_NAME))
+            if (keyspace.name.equals(SchemaConstants.SYSTEM_KEYSPACE_NAME) || 
keyspace.name.equals(SchemaConstants.ACCORD_KEYSPACE_NAME))
                 continue;
 
             for (TableMetadata cfm : keyspace.tables)
diff --git 
a/src/java/org/apache/cassandra/utils/memory/MemtableCleanerThread.java 
b/src/java/org/apache/cassandra/utils/memory/MemtableCleanerThread.java
index 99929546f5..20b320dc05 100644
--- a/src/java/org/apache/cassandra/utils/memory/MemtableCleanerThread.java
+++ b/src/java/org/apache/cassandra/utils/memory/MemtableCleanerThread.java
@@ -92,7 +92,7 @@ public class MemtableCleanerThread<P extends MemtablePool> 
implements Interrupti
             final int tasks = numPendingTasks.decrementAndGet();
 
             // if the cleaning job was scheduled (res == true) or had an 
error, trigger again after decrementing the tasks
-            if ((res || err != null) && pool.needsCleaning())
+            if (((res != null && res) || err != null) && pool.needsCleaning())
                 wait.signal();
 
             if (err != null)
diff --git 
a/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java 
b/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
index 260d468af3..ffa8cd62db 100644
--- a/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
+++ b/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiPredicate;
 
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -193,6 +194,12 @@ public class AccordDebugKeyspaceTest extends CQLTester
     private static final String QUERY_PATTERN_TRACE =
         String.format("SELECT * FROM %s.%s WHERE id = ?", 
SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.TXN_PATTERN_TRACE);
 
+    private static final String QUERY_SHARD_EPOCHS =
+        String.format("SELECT * FROM %s.%s", 
SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.SHARD_EPOCHS);
+
+    private static final String QUERY_LISTENERS_DEPS =
+        String.format("SELECT * FROM %s.%s", 
SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.LISTENERS_DEPS);
+
     @BeforeClass
     public static void setUpClass()
     {
@@ -215,6 +222,11 @@ public class AccordDebugKeyspaceTest extends CQLTester
         requireNetwork();
     }
 
+    @After
+    public void afterTest() throws Throwable
+    {
+    }
+
     @Test
     public void unknownIsEmpty()
     {
@@ -492,7 +504,7 @@ public class AccordDebugKeyspaceTest extends CQLTester
             getBlocking(accord.node().coordinate(id, txn));
             filter.apply.awaitThrowUncheckedOnInterrupt();
             spinUntilSuccess(() -> assertRows(execute(QUERY_TXN_BLOCKED_BY, 
id.toString()),
-                                              row(id.toString(), anyInt(), 0, 
"", "", any(), "Applied")));
+                                              row(id.toString(), 0, anyInt(), 
"", "", any(), "Applied")));
             assertRows(execute(QUERY_TXN, id.toString()), row(id.toString(), 
"Applied"));
             assertRows(execute(QUERY_TXN_REMOTE, nodeId, id.toString()), 
row(id.toString(), "Applied"));
             assertRows(execute(QUERY_JOURNAL, id.toString()), 
row(id.toString(), "PreAccepted"), row(id.toString(), "Applying"), 
row(id.toString(), "Applied"), row(id.toString(), null));
@@ -577,14 +589,14 @@ public class AccordDebugKeyspaceTest extends CQLTester
 
             filter.preAccept.awaitThrowUncheckedOnInterrupt();
             assertRows(execute(QUERY_TXN_BLOCKED_BY, id.toString()),
-                       row(id.toString(), anyInt(), 0, "", "", any(), 
anyOf(SaveStatus.PreAccepted.name(), SaveStatus.ReadyToExecute.name())));
+                       row(id.toString(), 0, anyInt(), "", "", any(), 
anyOf(SaveStatus.PreAccepted.name(), SaveStatus.ReadyToExecute.name())));
             assertRows(execute(QUERY_TXN_BLOCKED_BY_REMOTE, nodeId, 
id.toString()),
-                       row(nodeId, id.toString(), anyInt(), 0, "", "", any(), 
anyOf(SaveStatus.PreAccepted.name(), SaveStatus.ReadyToExecute.name())));
+                       row(nodeId, id.toString(), 0, anyInt(), "", "", any(), 
anyOf(SaveStatus.PreAccepted.name(), SaveStatus.ReadyToExecute.name())));
             filter.apply.awaitThrowUncheckedOnInterrupt();
             assertRows(execute(QUERY_TXN_BLOCKED_BY, id.toString()),
-                       row(id.toString(), anyInt(), 0, "", "", any(), 
SaveStatus.ReadyToExecute.name()));
+                       row(id.toString(), 0, anyInt(), "", "", any(), 
SaveStatus.ReadyToExecute.name()));
             assertRows(execute(QUERY_TXN_BLOCKED_BY_REMOTE, nodeId, 
id.toString()),
-                       row(nodeId, id.toString(), anyInt(), 0, "", "", any(), 
SaveStatus.ReadyToExecute.name()));
+                       row(nodeId, id.toString(), 0, anyInt(), "", "", any(), 
SaveStatus.ReadyToExecute.name()));
         }
         finally
         {
@@ -619,14 +631,14 @@ public class AccordDebugKeyspaceTest extends CQLTester
 
             filter.preAccept.awaitThrowUncheckedOnInterrupt();
             assertRows(execute(QUERY_TXN_BLOCKED_BY, first.toString()),
-                       row(first.toString(), anyInt(), 0, "", any(), any(), 
anyOf(SaveStatus.PreAccepted.name(), SaveStatus.ReadyToExecute.name())));
+                       row(first.toString(), 0, anyInt(), "", any(), any(), 
anyOf(SaveStatus.PreAccepted.name(), SaveStatus.ReadyToExecute.name())));
             assertRows(execute(QUERY_TXN_BLOCKED_BY_REMOTE, nodeId, 
first.toString()),
-                       row(nodeId, first.toString(), anyInt(), 0, "", any(), 
any(), anyOf(SaveStatus.PreAccepted.name(), SaveStatus.ReadyToExecute.name())));
+                       row(nodeId, first.toString(), 0, anyInt(), "", any(), 
any(), anyOf(SaveStatus.PreAccepted.name(), SaveStatus.ReadyToExecute.name())));
             filter.apply.awaitThrowUncheckedOnInterrupt();
             assertRows(execute(QUERY_TXN_BLOCKED_BY, first.toString()),
-                       row(first.toString(), anyInt(), 0, "", any(), 
anyNonNull(), SaveStatus.ReadyToExecute.name()));
+                       row(first.toString(), 0, anyInt(), "", any(), 
anyNonNull(), SaveStatus.ReadyToExecute.name()));
             assertRows(execute(QUERY_TXN_BLOCKED_BY_REMOTE, nodeId, 
first.toString()),
-                       row(nodeId, first.toString(), anyInt(), 0, "", any(), 
anyNonNull(), SaveStatus.ReadyToExecute.name()));
+                       row(nodeId, first.toString(), 0, anyInt(), "", any(), 
anyNonNull(), SaveStatus.ReadyToExecute.name()));
 
             filter.reset();
 
@@ -643,15 +655,15 @@ public class AccordDebugKeyspaceTest extends CQLTester
                                               return rs.size() == 2;
                                           });
             assertRows(execute(QUERY_TXN_BLOCKED_BY, second.toString()),
-                       row(second.toString(), anyInt(), 0, "", "", 
anyNonNull(), SaveStatus.Stable.name()),
-                       row(second.toString(), anyInt(), 1, any(), 
first.toString(), anyNonNull(), SaveStatus.ReadyToExecute.name()));
+                       row(second.toString(), 0, anyInt(), "", "", 
anyNonNull(), SaveStatus.Stable.name()),
+                       row(second.toString(), 1, anyInt(), first.toString(), 
any(), anyNonNull(), SaveStatus.ReadyToExecute.name()));
             assertRows(execute(QUERY_TXN_BLOCKED_BY + " AND depth < 1", 
second.toString()),
-                       row(second.toString(), anyInt(), 0, any(), "", 
anyNonNull(), SaveStatus.Stable.name()));
+                       row(second.toString(), 0, anyInt(), "", any(), 
anyNonNull(), SaveStatus.Stable.name()));
             assertRows(execute(QUERY_TXN_BLOCKED_BY_REMOTE, nodeId, 
second.toString()),
-                       row(nodeId, second.toString(), anyInt(), 0, "", "", 
anyNonNull(), SaveStatus.Stable.name()),
-                       row(nodeId, second.toString(), anyInt(), 1, any(), 
first.toString(), anyNonNull(), SaveStatus.ReadyToExecute.name()));
+                       row(nodeId, second.toString(), 0, anyInt(), "", "", 
anyNonNull(), SaveStatus.Stable.name()),
+                       row(nodeId, second.toString(), 1, anyInt(), 
first.toString(), any(), anyNonNull(), SaveStatus.ReadyToExecute.name()));
             assertRows(execute(QUERY_TXN_BLOCKED_BY_REMOTE + " AND depth < 1", 
nodeId, second.toString()),
-                       row(nodeId, second.toString(), anyInt(), 0, any(), "", 
anyNonNull(), SaveStatus.Stable.name()));
+                       row(nodeId, second.toString(), 0, anyInt(), "", any(), 
anyNonNull(), SaveStatus.Stable.name()));
         }
         finally
         {
@@ -727,6 +739,15 @@ public class AccordDebugKeyspaceTest extends CQLTester
         }
     }
 
+    @Test
+    public void testShardEpochsTable()
+    {
+        String table1 = createTable("CREATE TABLE %s (k int, c int, v int, 
PRIMARY KEY (k, c)) WITH transactional_mode = 'full'");
+        String table2 = createTable("CREATE TABLE %s (k int, c int, v int, 
PRIMARY KEY (k, c)) WITH transactional_mode = 'full'");
+        UntypedResultSet rs = execute(QUERY_SHARD_EPOCHS);
+        Assert.assertTrue(rs.size() > 1);
+    }
+
     private static AccordService accord()
     {
         return (AccordService) AccordService.instance();
diff --git a/test/unit/org/apache/cassandra/journal/TestParams.java 
b/test/unit/org/apache/cassandra/journal/TestParams.java
index edf357a790..d464f00fad 100644
--- a/test/unit/org/apache/cassandra/journal/TestParams.java
+++ b/test/unit/org/apache/cassandra/journal/TestParams.java
@@ -31,6 +31,12 @@ public class TestParams implements Params
         return 32 << 20;
     }
 
+    @Override
+    public int compactMaxSegments()
+    {
+        return 16;
+    }
+
     @Override
     public FailurePolicy failurePolicy()
     {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to