This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 42f57fb3dc54f72616f3c444ed1a0cc5f4240366
Author: Alex Petrov <[email protected]>
AuthorDate: Wed Jan 22 13:33:19 2025 +0100

    Avoid double loading in Cassandra side of journal; make sure to include 
records appended to journal.
    
    Patch by Alex Petrov; reviewed by Benedict Elliott Smith for CASSANDRA-20244
---
 modules/accord                                     |   2 +-
 src/java/org/apache/cassandra/journal/Journal.java | 172 ++++++++++++++++-----
 .../org/apache/cassandra/journal/OnDiskIndex.java  |  12 +-
 .../org/apache/cassandra/journal/Segments.java     |   6 +
 .../service/accord/AccordCommandStore.java         |  58 ++-----
 .../service/accord/AccordCommandStores.java        |   2 +-
 .../cassandra/service/accord/AccordJournal.java    |  89 +++--------
 .../service/accord/AccordJournalTable.java         | 150 ++++++------------
 .../org/apache/cassandra/journal/IndexTest.java    |   4 +-
 .../apache/cassandra/utils/AccordGenerators.java   |   2 +
 10 files changed, 237 insertions(+), 260 deletions(-)

diff --git a/modules/accord b/modules/accord
index cd7f49564a..96ec342423 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit cd7f49564a5ad053286453d10f8cd46b8d870c4f
+Subproject commit 96ec3424230bedf40b834785014a366716df8f42
diff --git a/src/java/org/apache/cassandra/journal/Journal.java 
b/src/java/org/apache/cassandra/journal/Journal.java
index cd7bb53d07..6f0fdabfd3 100644
--- a/src/java/org/apache/cassandra/journal/Journal.java
+++ b/src/java/org/apache/cassandra/journal/Journal.java
@@ -17,20 +17,21 @@
  */
 package org.apache.cassandra.journal;
 
-import java.io.Closeable;
 import java.io.IOException;
 import java.nio.channels.ClosedByInterruptException;
 import java.nio.file.FileStore;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
-import java.util.PriorityQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.LockSupport;
 import java.util.function.BooleanSupplier;
 import java.util.function.Function;
+import java.util.function.LongConsumer;
 import java.util.function.Predicate;
 import java.util.zip.CRC32;
 
@@ -51,8 +52,11 @@ import org.apache.cassandra.io.util.File;
 import org.apache.cassandra.io.util.PathUtils;
 import org.apache.cassandra.journal.Segments.ReferencedSegments;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.CloseableIterator;
 import org.apache.cassandra.utils.Crc;
 import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.LazyToString;
+import org.apache.cassandra.utils.MergeIterator;
 import org.apache.cassandra.utils.Simulate;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 import org.apache.cassandra.utils.concurrent.WaitQueue;
@@ -262,11 +266,11 @@ public class Journal<K, V> implements Shutdownable
             compactor.shutdown();
             compactor.awaitTermination(1, TimeUnit.MINUTES);
             flusher.shutdown();
-            closer.shutdown();
+            closeAllSegments();
             releaser.shutdown();
+            closer.shutdown();
             closer.awaitTermination(1, TimeUnit.MINUTES);
             releaser.awaitTermination(1, TimeUnit.MINUTES);
-            closeAllSegments();
             metrics.deregister();
             Invariants.require(state.compareAndSet(State.SHUTDOWN, 
State.TERMINATED),
                                   "Unexpected journal state while trying to 
shut down", state);
@@ -921,64 +925,156 @@ public class Journal<K, V> implements Shutdownable
         void write(DataOutputPlus out, int userVersion) throws IOException;
     }
 
-    public StaticSegmentIterator staticSegmentIterator()
+    /**
+     * Static segment iterator iterates all keys in _static_ segments in order.
+     */
+    public StaticSegmentKeyIterator staticSegmentKeyIterator()
     {
-        return new StaticSegmentIterator();
+        return new StaticSegmentKeyIterator();
     }
 
     /**
-     * Static segment iterator iterates all _static_ segments in _key_ order.
+     * List of key and a list of segment descriptors referencing this key
      */
-    public class StaticSegmentIterator implements Closeable
+    public static class KeyRefs<K>
     {
-        // TODO (expected): use MergeIterator
-        private final PriorityQueue<StaticSegment.KeyOrderReader<K>> readers;
-        private final ReferencedSegments<K, V> segments;
+        long segments[];
+        K key;
+        int size;
 
-        private StaticSegmentIterator()
+        public KeyRefs(K key)
         {
-            this.segments = selectAndReference(Segment::isStatic);
-            this.readers = new PriorityQueue<>();
-            for (Segment<K, V> segment : this.segments.all())
-            {
-                StaticSegment<K, V> staticSegment = (StaticSegment<K, 
V>)segment;
-                StaticSegment.KeyOrderReader<K> reader = 
staticSegment.keyOrderReader();
-                if (reader.advance())
-                    this.readers.add(reader);
-                else
-                    reader.close();
-            }
+            this.key = key;
+        }
+
+        private KeyRefs(int maxSize)
+        {
+            this.segments = new long[maxSize];
+        }
+
+        public void segments(LongConsumer consumer)
+        {
+            for (int i = 0; i < size; i++)
+                consumer.accept(segments[i]);
         }
 
         public K key()
         {
-            StaticSegment.KeyOrderReader<K> reader = readers.peek();
-            if (reader == null)
-                return null;
-            return reader.key();
+            return key;
+        }
+
+        private void add(K key, long segment)
+        {
+            this.key = key;
+            if (size == 0 || segments[size - 1] < segment)
+                segments[size++] = segment;
+            else
+                Invariants.require(segments[size - 1] == segment,
+                                   "Tried to add an out-of-order segment: %d, 
%s", segment,
+                                   LazyToString.lazy(() -> 
Arrays.toString(Arrays.copyOf(segments, size))));
         }
 
-        public void readAllForKey(K key, RecordConsumer<K> reader)
+        private void reset()
+        {
+            key = null;
+            size = 0;
+            Arrays.fill(segments, 0);
+        }
+    }
+
+    public class StaticSegmentKeyIterator implements 
CloseableIterator<KeyRefs<K>>
+    {
+        private final ReferencedSegments<K, V> segments;
+        private final MergeIterator<Head, KeyRefs<K>> iterator;
+
+        public StaticSegmentKeyIterator()
         {
-            while (true)
+            this.segments = selectAndReference(Segment::isStatic);
+            List<Iterator<Head>> iterators = new ArrayList<>(segments.count());
+
+            for (Segment<K, V> segment : segments.allSorted(true))
             {
-                StaticSegment.KeyOrderReader<K> next = readers.peek();
-                if (next == null || !next.key().equals(key))
-                    break;
-                Invariants.require(next == readers.poll());
+                StaticSegment<K, V> staticSegment = (StaticSegment<K, V>) 
segment;
+                Iterator<K> iter = staticSegment.index().reader();
+                Head head = new Head(staticSegment.descriptor.timestamp);
+                iterators.add(new Iterator<>()
+                {
+                    public boolean hasNext()
+                    {
+                        return iter.hasNext();
+                    }
 
-                reader.accept(next.descriptor.timestamp, next.offset, 
next.key(), next.record(), next.descriptor.userVersion);
-                if (next.advance())
-                    readers.add(next);
-                else
-                    next.close();
+                    public Head next()
+                    {
+                        head.key = iter.next();
+                        return head;
+                    }
+                });
             }
+
+            this.iterator = MergeIterator.get(iterators,
+                                              (r1, r2) -> {
+                                                  int keyCmp = 
keySupport.compare(r1.key, r2.key);
+                                                  if (keyCmp != 0)
+                                                      return keyCmp;
+                                                  return 
Long.compare(r1.segment, r2.segment);
+                                              },
+                                              new MergeIterator.Reducer<Head, 
KeyRefs<K>>()
+                                              {
+                                                  final KeyRefs<K> ret = new 
KeyRefs<>(segments.count());
+
+                                                  @Override
+                                                  public void reduce(int idx, 
Head head)
+                                                  {
+                                                      ret.add(head.key, 
head.segment);
+                                                  }
+
+                                                  @Override
+                                                  protected KeyRefs<K> 
getReduced()
+                                                  {
+                                                      return ret;
+                                                  }
+
+                                                  @Override
+                                                  protected void onKeyChange()
+                                                  {
+                                                      ret.reset();
+                                                      super.onKeyChange();
+                                                  }
+                                              });
         }
 
+        @Override
         public void close()
         {
             segments.close();
         }
+
+        public KeyRefs<K> peek()
+        {
+            if (iterator.hasNext())
+                return iterator.peek();
+            return null;
+        }
+
+        @Override
+        public boolean hasNext()
+        {
+            return iterator.hasNext();
+        }
+
+        @Override
+        public KeyRefs<K> next()
+        {
+            return iterator.next();
+        }
+
+        class Head
+        {
+            final long segment;
+            K key;
+            Head(long segment) { this.segment = segment; }
+        }
     }
 
     enum State
diff --git a/src/java/org/apache/cassandra/journal/OnDiskIndex.java 
b/src/java/org/apache/cassandra/journal/OnDiskIndex.java
index 17b8a7b791..8c662d889a 100644
--- a/src/java/org/apache/cassandra/journal/OnDiskIndex.java
+++ b/src/java/org/apache/cassandra/journal/OnDiskIndex.java
@@ -24,7 +24,6 @@ import java.nio.file.StandardOpenOption;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.zip.CRC32;
-
 import javax.annotation.Nullable;
 
 import accord.utils.Invariants;
@@ -32,6 +31,7 @@ import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.File;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.AbstractIterator;
 import org.apache.cassandra.utils.Crc;
 
 import static org.apache.cassandra.journal.Journal.validateCRC;
@@ -244,7 +244,7 @@ final class OnDiskIndex<K> extends Index<K>
         return new IndexReader();
     }
 
-    public class IndexReader
+    public class IndexReader extends AbstractIterator<K>
     {
         int idx;
         K key;
@@ -256,10 +256,12 @@ final class OnDiskIndex<K> extends Index<K>
             idx = -1;
         }
 
-        public K key()
+        protected K computeNext()
         {
-            ensureAdvanced();
-            return key;
+            if (advance())
+                return key;
+            else
+                return endOfData();
         }
 
         public int offset()
diff --git a/src/java/org/apache/cassandra/journal/Segments.java 
b/src/java/org/apache/cassandra/journal/Segments.java
index cb38a489ca..a2475557a6 100644
--- a/src/java/org/apache/cassandra/journal/Segments.java
+++ b/src/java/org/apache/cassandra/journal/Segments.java
@@ -206,6 +206,12 @@ class Segments<K, V>
             this.refs = refs;
         }
 
+        public int count()
+        {
+            if (refs == null) return 0;
+            else return refs.size();
+        }
+
         @Override
         public void close()
         {
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java 
b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
index c87cd41baf..8086cecdbc 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
@@ -45,14 +45,12 @@ import accord.local.Command;
 import accord.local.CommandStore;
 import accord.local.CommandStores;
 import accord.local.Commands;
-import accord.local.KeyHistory;
 import accord.local.NodeCommandStoreService;
 import accord.local.PreLoadContext;
 import accord.local.RedundantBefore;
 import accord.local.SafeCommandStore;
 import accord.local.cfk.CommandsForKey;
 import accord.primitives.PartialTxn;
-import accord.primitives.Participants;
 import accord.primitives.RangeDeps;
 import accord.primitives.Ranges;
 import accord.primitives.RoutableKey;
@@ -70,11 +68,6 @@ import static accord.api.Journal.CommandUpdate;
 import static accord.api.Journal.FieldUpdates;
 import static accord.api.Journal.Load.MINIMAL;
 import static accord.api.Journal.Loader;
-import static accord.api.Journal.OnDone;
-import static accord.local.KeyHistory.SYNC;
-import static accord.primitives.Status.Committed;
-import static accord.primitives.Status.PreCommitted;
-import static accord.primitives.Status.Truncated;
 import static accord.utils.Invariants.require;
 
 public class AccordCommandStore extends CommandStore
@@ -188,9 +181,9 @@ public class AccordCommandStore extends CommandStore
         maybeLoadRangesForEpoch(journal.loadRangesForEpoch(id()));
     }
 
-    static Factory factory(Journal journal, IntFunction<AccordExecutor> 
executorFactory)
+    static Factory factory(IntFunction<AccordExecutor> executorFactory)
     {
-        return (id, node, agent, dataStore, progressLogFactory, 
listenerFactory, rangesForEpoch) ->
+        return (id, node, agent, dataStore, progressLogFactory, 
listenerFactory, rangesForEpoch, journal) ->
                new AccordCommandStore(id, node, agent, dataStore, 
progressLogFactory, listenerFactory, rangesForEpoch, journal, 
executorFactory.apply(id));
     }
 
@@ -489,48 +482,15 @@ public class AccordCommandStore extends CommandStore
             this.store = store;
         }
 
-        private PreLoadContext context(Command command, KeyHistory keyHistory)
-        {
-            TxnId txnId = command.txnId();
-            Participants<?> keys = null;
-            if (CommandsForKey.manages(txnId))
-                keys = command.hasBeen(Committed) ? 
command.participants().hasTouched() : command.participants().touches();
-            else if (!CommandsForKey.managesExecution(txnId) && 
command.hasBeen(PreCommitted) && !command.hasBeen(Truncated))
-                keys = command.asCommitted().waitingOn.keys;
-
-            if (keys != null)
-                return PreLoadContext.contextFor(txnId, keys, keyHistory);
-
-            return txnId;
-        }
-
-        @Override
-        public void load(Command command, OnDone onDone)
-        {
-            store.execute(context(command, SYNC), safeStore -> 
loadInternal(command, safeStore))
-                 .begin((unused, throwable) -> {
-                     if (throwable != null)
-                         onDone.failure(throwable);
-                     else
-                         onDone.success();
-                 });
-        }
-
         @Override
-        public void apply(Command command, OnDone onDone)
+        public AsyncChain<Command> load(TxnId txnId)
         {
-            PreLoadContext context = context(command, SYNC);
-            store.execute(context, safeStore -> {
-                     applyWrites(command.txnId(), safeStore, (safeCommand, 
cmd) -> {
-                         Commands.applyWrites(safeStore, context, 
cmd).begin(store.agent);
-                     });
-                 })
-                 .begin((unused, throwable) -> {
-                     if (throwable != null)
-                         onDone.failure(throwable);
-                     else
-                         onDone.success();
-                 });
+            return store.submit(txnId, safeStore -> {
+                maybeApplyWrites(txnId, safeStore, (safeCommand, cmd) -> {
+                    Commands.applyWrites(safeStore, txnId, 
cmd).begin(store.agent);
+                });
+                return safeStore.unsafeGet(txnId).current();
+            });
         }
     }
 
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java 
b/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
index 7188628772..37beb2c4fb 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
@@ -71,7 +71,7 @@ public class AccordCommandStores extends CommandStores 
implements CacheSize
                         Journal journal, AccordExecutor[] executors)
     {
         super(node, agent, store, random, journal, shardDistributor, 
progressLogFactory, listenerFactory,
-              AccordCommandStore.factory(journal, id -> executors[id % 
executors.length]));
+              AccordCommandStore.factory(id -> executors[id % 
executors.length]));
         this.executors = executors;
         this.cacheSizeMetrics = new CacheSizeMetrics(ACCORD_STATE_CACHE, this);
         cacheSize = DatabaseDescriptor.getAccordCacheSizeInMiB() << 20;
diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java 
b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
index 9be8739416..fd435a90d7 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
@@ -25,7 +25,6 @@ import java.util.Iterator;
 import java.util.NavigableMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.function.BiConsumer;
 import javax.annotation.Nullable;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -48,13 +47,13 @@ import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
 import accord.utils.Invariants;
 import accord.utils.PersistentField;
+import accord.utils.async.AsyncChains;
 import accord.utils.async.AsyncResult;
 import accord.utils.async.AsyncResults;
 import org.apache.cassandra.concurrent.Shutdownable;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.DataOutputPlus;
@@ -74,13 +73,13 @@ import 
org.apache.cassandra.service.accord.serializers.CommandSerializers.Execut
 import org.apache.cassandra.service.accord.serializers.DepsSerializers;
 import org.apache.cassandra.service.accord.serializers.ResultSerializers;
 import org.apache.cassandra.service.accord.serializers.WaitingOnSerializer;
+import org.apache.cassandra.utils.CloseableIterator;
 import org.apache.cassandra.utils.ExecutorUtils;
-import org.apache.cassandra.utils.concurrent.AsyncPromise;
 
 import static accord.impl.CommandChange.anyFieldChanged;
-import static accord.impl.CommandChange.isNull;
 import static accord.impl.CommandChange.getFlags;
 import static accord.impl.CommandChange.isChanged;
+import static accord.impl.CommandChange.isNull;
 import static accord.impl.CommandChange.nextSetField;
 import static accord.impl.CommandChange.setChanged;
 import static accord.impl.CommandChange.setFieldIsNull;
@@ -90,7 +89,6 @@ import static accord.impl.CommandChange.validateFlags;
 import static accord.local.Cleanup.Input.FULL;
 import static accord.primitives.SaveStatus.Erased;
 import static accord.primitives.SaveStatus.Vestigial;
-import static accord.primitives.Status.Truncated;
 import static 
org.apache.cassandra.service.accord.AccordJournalValueSerializers.DurableBeforeAccumulator;
 
 public class AccordJournal implements accord.api.Journal, 
RangeSearcher.Supplier, Shutdownable
@@ -439,55 +437,34 @@ public class AccordJournal implements accord.api.Journal, 
RangeSearcher.Supplier
         journalTable.forceCompaction();
     }
 
-    @Override
+    @SuppressWarnings("unchecked") @Override
     public void replay(CommandStores commandStores)
     {
         journal.closeCurrentSegmentForTestingIfNonEmpty();
-        try (AccordJournalTable.KeyOrderIterator<JournalKey> iter = 
journalTable.readAll())
+        try (CloseableIterator<Journal.KeyRefs<JournalKey>> iter = 
journalTable.keyIterator())
         {
-            JournalKey key;
-            Builder builder = new Builder();
-
-            while ((key = iter.key()) != null)
+            while (iter.hasNext())
             {
-                builder.reset(key.id);
-                if (key.type != JournalKey.Type.COMMAND_DIFF)
-                {
-                    // TODO (required): add "skip" for the key to avoid 
getting stuck
-                    iter.readAllForKey(key, (segment, position, key1, buffer, 
userVersion) -> {});
+                Journal.KeyRefs<JournalKey> ref = iter.next();
+
+                if (ref.key().type != JournalKey.Type.COMMAND_DIFF)
                     continue;
-                }
 
-                JournalKey finalKey = key;
-                iter.readAllForKey(key, (segment, position, local, buffer, 
userVersion) -> {
-                    Invariants.require(finalKey.equals(local));
-                    try (DataInputBuffer in = new DataInputBuffer(buffer, 
false))
-                    {
-                        builder.deserializeNext(in, userVersion);
-                        if (journalTable.shouldIndex(finalKey)
-                            && builder.participants() != null
-                            && builder.participants().route() != null)
-                            journalTable.safeNotify(index ->
-                                                    index.update(segment, 
finalKey.commandStoreId, finalKey.id, builder.participants().route()));
-                    }
-                    catch (IOException e)
-                    {
-                        // can only throw if serializer is buggy
-                        throw new RuntimeException(e);
-                    }
-                });
-
-                if (!builder.isEmpty())
-                {
-                    CommandStore commandStore = 
commandStores.forId(key.commandStoreId);
-                    Command command = 
builder.construct(commandStore.unsafeGetRedundantBefore());
-                    Invariants.require(command.saveStatus() != 
SaveStatus.Uninitialised,
-                                          "Found uninitialized command in the 
log: %s %s", command.toString(), builder.toString());
-                    Loader loader = commandStore.loader();
-                    async(loader::load, command).get();
-                    if (command.saveStatus().compareTo(SaveStatus.Stable) >= 0 
&& !command.hasBeen(Truncated))
-                        async(loader::apply, command).get();
-                }
+                CommandStore commandStore = 
commandStores.forId(ref.key().commandStoreId);
+                Loader loader = commandStore.loader();
+                AsyncChains.getUnchecked(loader.load(ref.key().id)
+                                               .map(command -> {
+                                                   if 
(journalTable.shouldIndex(ref.key())
+                                                       && 
command.participants() != null
+                                                       && 
command.participants().route() != null)
+                                                   {
+                                                       ref.segments(segment -> 
{
+                                                           
journalTable.safeNotify(index -> index.update(segment, 
ref.key().commandStoreId, ref.key().id, command.participants().route()));
+                                                       });
+                                                   }
+                                                   return command;
+                                               })
+                                               .beginAsResult());
             }
         }
         catch (Throwable t)
@@ -496,24 +473,6 @@ public class AccordJournal implements accord.api.Journal, 
RangeSearcher.Supplier
         }
     }
 
-    private AsyncPromise<?> async(BiConsumer<Command, OnDone> consumer, 
Command command)
-    {
-        AsyncPromise<?> future = new AsyncPromise<>();
-        consumer.accept(command, new OnDone()
-        {
-            public void success()
-            {
-                future.setSuccess(null);
-            }
-
-            public void failure(Throwable t)
-            {
-                future.setFailure(t);
-            }
-        });
-        return future;
-    }
-
     public static @Nullable ByteBuffer asSerializedChange(Command before, 
Command after, int userVersion) throws IOException
     {
         try (DataOutputBuffer out = new DataOutputBuffer())
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java 
b/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java
index 95267a6b79..f4c96fa122 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java
@@ -17,16 +17,16 @@
  */
 package org.apache.cassandra.service.accord;
 
-import java.io.Closeable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.function.Consumer;
-
+import javax.annotation.CheckForNull;
 import javax.annotation.Nullable;
 
+import com.google.common.collect.AbstractIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -153,11 +153,11 @@ public class AccordJournalTable<K extends JournalKey, V> 
implements RangeSearche
         void read(DataInputPlus input, int userVersion) throws IOException;
     }
 
-    private abstract class AbstractRecordConsumer implements RecordConsumer<K>
+    private class RecordConsumerAdapter implements RecordConsumer<K>
     {
         protected final Reader reader;
 
-        AbstractRecordConsumer(Reader reader)
+        RecordConsumerAdapter(Reader reader)
         {
             this.reader = reader;
         }
@@ -169,13 +169,13 @@ public class AccordJournalTable<K extends JournalKey, V> 
implements RangeSearche
         }
     }
 
-    private class TableRecordConsumer extends AbstractRecordConsumer
+    private class TableRecordConsumer implements RecordConsumer<K>
     {
         protected LongHashSet visited = null;
-
-        TableRecordConsumer(Reader reader)
+        protected RecordConsumer<K> delegate;
+        TableRecordConsumer(RecordConsumer<K> delegate)
         {
-            super(reader);
+            this.delegate = delegate;
         }
 
         void visit(long segment)
@@ -194,20 +194,20 @@ public class AccordJournalTable<K extends JournalKey, V> 
implements RangeSearche
         public void accept(long segment, int position, K key, ByteBuffer 
buffer, int userVersion)
         {
             visit(segment);
-            super.accept(segment, position, key, buffer, userVersion);
+            delegate.accept(segment, position, key, buffer, userVersion);
         }
     }
 
-    private class JournalAndTableRecordConsumer extends AbstractRecordConsumer
+    private class JournalAndTableRecordConsumer implements RecordConsumer<K>
     {
         private final K key;
         private final TableRecordConsumer tableRecordConsumer;
-
-        JournalAndTableRecordConsumer(K key, Reader reader)
+        private final RecordConsumer<K> delegate;
+        JournalAndTableRecordConsumer(K key, RecordConsumer<K> reader)
         {
-            super(reader);
             this.key = key;
             this.tableRecordConsumer = new TableRecordConsumer(reader);
+            this.delegate = reader;
         }
 
         void readTable()
@@ -219,7 +219,7 @@ public class AccordJournalTable<K extends JournalKey, V> 
implements RangeSearche
         public void accept(long segment, int position, K key, ByteBuffer 
buffer, int userVersion)
         {
             if (!tableRecordConsumer.visited(segment))
-                super.accept(segment, position, key, buffer, userVersion);
+                delegate.accept(segment, position, key, buffer, userVersion);
         }
     }
 
@@ -340,6 +340,11 @@ public class AccordJournalTable<K extends JournalKey, V> 
implements RangeSearche
      * When reading from journal segments, skip descriptors that were read 
from the table.
      */
     public void readAll(K key, Reader reader, boolean asc)
+    {
+        readAll(key, new RecordConsumerAdapter(reader), asc);
+    }
+
+    public void readAll(K key, RecordConsumer<K> reader, boolean asc)
     {
         JournalAndTableRecordConsumer consumer = new 
JournalAndTableRecordConsumer(key, reader);
         journal.readAll(key, consumer, asc);
@@ -386,19 +391,16 @@ public class AccordJournalTable<K extends JournalKey, V> 
implements RangeSearche
     }
 
     @SuppressWarnings("resource") // Auto-closeable iterator will release 
related resources
-    public KeyOrderIterator<K> readAll()
+    public CloseableIterator<Journal.KeyRefs<K>> keyIterator()
     {
         return new JournalAndTableKeyIterator();
     }
 
-    private class TableIterator implements Closeable
+    private class TableIterator extends AbstractIterator<K> implements 
CloseableIterator<K>
     {
         private final UnfilteredPartitionIterator mergeIterator;
         private final RefViewFragment view;
 
-        private UnfilteredRowIterator partition;
-        private LongHashSet visited = null;
-
         private TableIterator()
         {
             view = cfs.selectAndReference(v -> v.select(SSTableSet.LIVE));
@@ -407,57 +409,24 @@ public class AccordJournalTable<K extends JournalKey, V> 
implements RangeSearche
                 scanners.add(sstable.getScanner());
 
             mergeIterator = view.sstables.isEmpty()
-                     ? EmptyIterators.unfilteredPartition(cfs.metadata())
-                     : UnfilteredPartitionIterators.merge(scanners, 
UnfilteredPartitionIterators.MergeListener.NOOP);
+                            ? 
EmptyIterators.unfilteredPartition(cfs.metadata())
+                            : UnfilteredPartitionIterators.merge(scanners, 
UnfilteredPartitionIterators.MergeListener.NOOP);
         }
 
-        public JournalKey key()
+        @CheckForNull
+        protected K computeNext()
         {
-            if (partition == null)
+            if (mergeIterator.hasNext())
             {
-                if (mergeIterator.hasNext())
-                    partition = mergeIterator.next();
-                else
-                    return null;
-            }
-
-            return 
AccordKeyspace.JournalColumns.getJournalKey(partition.partitionKey());
-        }
-
-        protected void readAllForKey(K key, RecordConsumer<K> recordConsumer)
-        {
-            while (partition.hasNext())
-            {
-                EntryHolder<K> into = new EntryHolder<>();
-                // TODO: use flyweight to avoid allocating extra lambdas?
-                readRow(key, partition.next(), into, (segment, position, key1, 
buffer, userVersion) -> {
-                    visit(segment);
-                    recordConsumer.accept(segment, position, key1, buffer, 
userVersion);
-                });
+                try (UnfilteredRowIterator partition = mergeIterator.next())
+                {
+                    return (K) 
AccordKeyspace.JournalColumns.getJournalKey(partition.partitionKey());
+                }
             }
-
-            partition = null;
+            else
+                return endOfData();
         }
 
-        void visit(long segment)
-        {
-            if (visited == null)
-                visited = new LongHashSet();
-            visited.add(segment);
-        }
-
-        boolean visited(long segment)
-        {
-            return visited != null && visited.contains(segment);
-        }
-
-
-        void clear()
-        {
-            visited = null;
-        }
-
-
         @Override
         public void close()
         {
@@ -466,62 +435,45 @@ public class AccordJournalTable<K extends JournalKey, V> 
implements RangeSearche
         }
     }
 
-    private class JournalAndTableKeyIterator implements KeyOrderIterator<K>
+    private class JournalAndTableKeyIterator extends 
AbstractIterator<Journal.KeyRefs<K>> implements 
CloseableIterator<Journal.KeyRefs<K>>
     {
         final TableIterator tableIterator;
-        final Journal<K, V>.StaticSegmentIterator staticSegmentIterator;
+        final Journal<K, V>.StaticSegmentKeyIterator journalIterator;
 
         private JournalAndTableKeyIterator()
         {
             this.tableIterator = new TableIterator();
-            this.staticSegmentIterator = journal.staticSegmentIterator();
+            this.journalIterator = journal.staticSegmentKeyIterator();
         }
 
-        @Override
-        public K key()
+        protected Journal.KeyRefs<K> computeNext()
         {
-            // TODO (expected): fix generics mismatch here
-            K tableKey = (K)tableIterator.key();
-            K journalKey = staticSegmentIterator.key();
+            K tableKey = tableIterator.hasNext() ? tableIterator.peek() : null;
+            Journal.KeyRefs<K> journalKey = journalIterator.hasNext() ? 
journalIterator.peek() : null;
+
             if (tableKey == null)
-                return journalKey;
-            if (journalKey == null || keySupport.compare(tableKey, journalKey) 
> 0)
-                return journalKey;
+                return journalKey == null ? endOfData() : 
journalIterator.next();
 
-            return tableKey;
-        }
+            if (journalKey == null)
+                return new Journal.KeyRefs<>(tableIterator.next());
 
-        @Override
-        public void readAllForKey(K key, RecordConsumer<K> reader)
-        {
-            K tableKey = (K)tableIterator.key();
-            K journalKey = staticSegmentIterator.key();
-            if (journalKey != null && keySupport.compare(journalKey, key) == 0)
-                staticSegmentIterator.readAllForKey(key, (segment, position, 
key1, buffer, userVersion) -> {
-                    if (!tableIterator.visited(segment))
-                        reader.accept(segment, position, key1, buffer, 
userVersion);
-                });
-
-            if (tableKey != null && keySupport.compare(tableKey, key) == 0)
-                tableIterator.readAllForKey(key, reader);
-
-            tableIterator.clear();
+            int cmp = keySupport.compare(tableKey, journalKey.key());
+            if (cmp == 0)
+            {
+                tableIterator.next();
+                return journalIterator.next();
+            }
+
+            return cmp > 0 ? new Journal.KeyRefs<>(tableIterator.next()) : 
journalIterator.next();
         }
 
         public void close()
         {
             tableIterator.close();
-            staticSegmentIterator.close();
+            journalIterator.close();
         }
     }
 
-    public interface KeyOrderIterator<K> extends Closeable
-    {
-        K key();
-        void readAllForKey(K key, RecordConsumer<K> reader);
-        void close();
-    }
-
     public static void readBuffer(ByteBuffer buffer, Reader reader, int 
userVersion)
     {
         try (DataInputBuffer in = new DataInputBuffer(buffer, false))
diff --git a/test/unit/org/apache/cassandra/journal/IndexTest.java 
b/test/unit/org/apache/cassandra/journal/IndexTest.java
index 43e2e21fa9..acde8cc69c 100644
--- a/test/unit/org/apache/cassandra/journal/IndexTest.java
+++ b/test/unit/org/apache/cassandra/journal/IndexTest.java
@@ -238,10 +238,10 @@ public class IndexTest
 
             OnDiskIndex<TimeUUID>.IndexReader iter = onDisk.reader();
             Iterator<Pair<TimeUUID, Long>> expectedIter = 
sortedEntries.iterator();
-            while (iter.advance())
+            while (iter.hasNext())
             {
                 Pair<TimeUUID, Long> expected = expectedIter.next();
-                Assert.assertEquals(iter.key(), expected.left);
+                Assert.assertEquals(iter.next(), expected.left);
                 Assert.assertEquals(iter.recordSize(), 
Index.readSize(expected.right));
                 Assert.assertEquals(iter.offset(), 
Index.readOffset(expected.right));
             }
diff --git a/test/unit/org/apache/cassandra/utils/AccordGenerators.java 
b/test/unit/org/apache/cassandra/utils/AccordGenerators.java
index 1b60d0a3b5..73e4b347ad 100644
--- a/test/unit/org/apache/cassandra/utils/AccordGenerators.java
+++ b/test/unit/org/apache/cassandra/utils/AccordGenerators.java
@@ -230,6 +230,8 @@ public class AccordGenerators
             builder.promised(promised);
             if (saveStatus.status.compareTo(Status.PreAccepted) > 0)
                 builder.acceptedOrCommitted(accepted);
+            else
+                builder.acceptedOrCommitted(Ballot.ZERO);
             if (saveStatus.compareTo(SaveStatus.Stable) >= 0 && 
!saveStatus.hasBeen(Status.Truncated))
                 builder.waitingOn(waitingOn);
             if (saveStatus.hasBeen(Status.PreApplied) && 
!saveStatus.hasBeen(Status.Truncated))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to