This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9ed9c7cacd Iterate entries in reverse order, as we can not rely on 
switch atomicity
9ed9c7cacd is described below

commit 9ed9c7cacd3faac8af83b9f45d18f6c3fdc6d58c
Author: Alex Petrov <oleksandr.pet...@gmail.com>
AuthorDate: Thu Aug 7 13:46:06 2025 +0200

    Iterate entries in reverse order, as we can not rely on switch atomicity
    
    Patch by Alex Petrov; reviewed by Benedict Elliott Smith for CASSANDRA-20837
---
 .../service/accord/AccordJournalTable.java         | 200 ++++++++++-----------
 .../cassandra/tools/StandaloneJournalUtil.java     |  52 ++++++
 .../cassandra/utils/JVMStabilityInspector.java     |   3 -
 3 files changed, 147 insertions(+), 108 deletions(-)

diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java 
b/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java
index 0e8cff3bb5..3c97b02dcd 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java
@@ -36,7 +36,6 @@ import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
 import accord.utils.Invariants;
 import accord.utils.UncheckedInterruptedException;
-import org.agrona.collections.LongHashSet;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.cql3.Operator;
@@ -61,7 +60,6 @@ import org.apache.cassandra.db.marshal.LongType;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
 import org.apache.cassandra.db.rows.Row;
-import org.apache.cassandra.db.rows.Unfiltered;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.db.rows.UnfilteredRowIterators;
 import org.apache.cassandra.dht.Bounds;
@@ -73,7 +71,6 @@ import org.apache.cassandra.io.sstable.ISSTableScanner;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.journal.EntrySerializer.EntryHolder;
 import org.apache.cassandra.journal.Journal;
 import org.apache.cassandra.journal.KeySupport;
 import org.apache.cassandra.journal.RecordConsumer;
@@ -83,10 +80,12 @@ import 
org.apache.cassandra.service.accord.AccordKeyspace.JournalColumns;
 import org.apache.cassandra.service.accord.api.TokenKey;
 import org.apache.cassandra.service.accord.serializers.CommandSerializers;
 import org.apache.cassandra.service.accord.serializers.Version;
+import org.apache.cassandra.utils.Closeable;
 import org.apache.cassandra.utils.CloseableIterator;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.MergeIterator;
+import org.apache.cassandra.utils.concurrent.OpOrder;
 
 import static 
org.apache.cassandra.io.sstable.SSTableReadsListener.NOOP_LISTENER;
 import static 
org.apache.cassandra.service.accord.AccordKeyspace.JournalColumns.getJournalKey;
@@ -221,63 +220,6 @@ public class AccordJournalTable<K extends JournalKey, V> 
implements RangeSearche
         }
     }
 
-    // TODO (expected): this can be removed entirely when we "flush" segments 
directly to sstables (but we perhaps need to be careful about the active 
segment)
-    private class TableRecordConsumer implements RecordConsumer<K>
-    {
-        final LongHashSet visited;
-        final RecordConsumer<K> delegate;
-
-        TableRecordConsumer(LongHashSet visited, RecordConsumer<K> delegate)
-        {
-            this.visited = visited;
-            this.delegate = delegate;
-        }
-
-        boolean visited(long segment)
-        {
-            return visited != null && visited.contains(segment);
-        }
-
-        @Override
-        public void accept(long segment, int position, K key, ByteBuffer 
buffer, int userVersion)
-        {
-            if (!visited(segment))
-                delegate.accept(segment, position, key, buffer, userVersion);
-        }
-    }
-
-    private class JournalAndTableRecordConsumer implements RecordConsumer<K>
-    {
-        private final K key;
-        private final RecordConsumer<K> delegate;
-        private LongHashSet visited;
-
-        void visit(long segment)
-        {
-            if (visited == null)
-                visited = new LongHashSet();
-            visited.add(segment);
-        }
-
-        JournalAndTableRecordConsumer(K key, RecordConsumer<K> reader)
-        {
-            this.key = key;
-            this.delegate = reader;
-        }
-
-        void readTable()
-        {
-            readAllFromTable(key, new TableRecordConsumer(visited, delegate));
-        }
-
-        @Override
-        public void accept(long segment, int position, K key, ByteBuffer 
buffer, int userVersion)
-        {
-            visit(segment);
-            delegate.accept(segment, position, key, buffer, userVersion);
-        }
-    }
-
     /**
      * When using {@link PartitionRangeReadCommand} we need to work with 
{@link RowFilter} which works with columns.
      * But the index doesn't care about table based queries and needs to be 
queried using the fields in the index, to
@@ -405,71 +347,119 @@ public class AccordJournalTable<K extends JournalKey, V> 
implements RangeSearche
      */
     public void readAll(K key, Reader reader)
     {
-        readAll(key, new RecordConsumerAdapter(reader));
+        readAll(key, new RecordConsumerAdapter<>(reader));
     }
 
     public void readAll(K key, RecordConsumer<K> reader)
     {
-        JournalAndTableRecordConsumer consumer = new 
JournalAndTableRecordConsumer(key, reader);
-        journal.readAll(key, consumer);
-        consumer.readTable();
+        try (TableKeyIterator table = readAllFromTable(key))
+        {
+            boolean hasTableData = table.advance();
+            long minSegment = hasTableData ? table.segment : Long.MIN_VALUE;
+            // First, read all journal entries newer than anything flushed 
into sstables
+            journal.readAll(key, (segment, position, key1, buffer, 
userVersion) -> {
+                if (segment > minSegment)
+                    reader.accept(segment, position, key1, buffer, 
userVersion);
+            });
+
+            // Then, read SSTables
+            while (hasTableData)
+            {
+                reader.accept(table.segment, table.offset, key, table.value, 
table.userVersion);
+                hasTableData = table.advance();
+            }
+        }
     }
-
-    private void readAllFromTable(K key, TableRecordConsumer onEntry)
+    
+    // TODO (expected): why are recordColumn and versionColumn instance 
fields, so that this cannot be a static class?
+    class TableKeyIterator implements Closeable, RecordConsumer<K>
     {
-        DecoratedKey pk = JournalColumns.decorate(key);
-        try (RefViewFragment view = 
cfs.selectAndReference(View.select(SSTableSet.LIVE, pk)))
+        final K key;
+        final List<UnfilteredRowIterator> unmerged;
+        final UnfilteredRowIterator merged;
+        final OpOrder.Group readOrder;
+
+        long segment;
+        int offset;
+        ByteBuffer value;
+        int userVersion;
+
+        TableKeyIterator(K key, List<UnfilteredRowIterator> unmerged, 
UnfilteredRowIterator merged, OpOrder.Group readOrder)
         {
-            if (view.sstables.isEmpty())
-                return;
+            this.key = key;
+            this.unmerged = unmerged;
+            this.merged = merged;
+            this.readOrder = readOrder;
+        }
 
-            List<UnfilteredRowIterator> iters = new ArrayList<>(Math.min(4, 
view.sstables.size()));
-            try
-            {
-                for (SSTableReader sstable : view.sstables)
-                {
-                    if (!sstable.mayContainAssumingKeyIsInRange(pk))
-                        continue;
+        @Override
+        public void accept(long segment, int offset, K key, ByteBuffer buffer, 
int userVersion)
+        {
+            this.segment = segment;
+            this.offset = offset;
+            this.value = buffer;
+            this.userVersion = userVersion;
+        }
 
-                    UnfilteredRowIterator iter = 
StorageHook.instance.makeRowIterator(cfs, sstable, pk, Slices.ALL, 
ColumnFilter.all(cfs.metadata()), false, NOOP_LISTENER);
-                    if (iter.getClass() != 
EmptyIterators.EmptyUnfilteredRowIterator.class)
-                        iters.add(iter);
-                }
+        boolean advance()
+        {
+            if (merged == null || !merged.hasNext())
+                return false;
 
-                if (!iters.isEmpty())
-                {
-                    EntryHolder<K> into = new EntryHolder<>();
-                    try (UnfilteredRowIterator iter = 
UnfilteredRowIterators.merge(iters))
-                    {
-                        while (iter.hasNext()) readRow(key, iter.next(), into, 
onEntry);
-                    }
-                }
+            try
+            {
+                Row row = (Row) merged.next();
+                segment = LongType.instance.compose(ByteBuffer.wrap((byte[]) 
row.clustering().get(0)));
+                offset = Int32Type.instance.compose(ByteBuffer.wrap((byte[]) 
row.clustering().get(1)));
+                value = row.getCell(recordColumn).buffer();
+                userVersion = 
Int32Type.instance.compose(row.getCell(versionColumn).buffer());
+                return true;
             }
             catch (Throwable t)
             {
-                String message = "Failed to read from " + iters;
-                for (UnfilteredRowIterator iter : iters)
-                {
-                    try { iter.close(); }
-                    catch (Throwable t2) { t.addSuppressed(t2); }
-                }
-                throw new FSReadError(message, t);
+                throw new FSReadError("Failed to read from " + unmerged, t);
             }
         }
+
+        @Override
+        public void close()
+        {
+            readOrder.close();
+            if (merged != null)
+                merged.close();
+        }
     }
 
-    private void readRow(K key, Unfiltered unfiltered, EntryHolder<K> into, 
RecordConsumer<K> onEntry)
+    private TableKeyIterator readAllFromTable(K key)
     {
-        Invariants.require(unfiltered.isRow());
-        Row row = (Row) unfiltered;
+        DecoratedKey pk = JournalColumns.decorate(key);
+        OpOrder.Group readOrder = cfs.readOrdering.start();
+        List<UnfilteredRowIterator> iters = new ArrayList<>(3);
+        try
+        {
+            ColumnFamilyStore.ViewFragment view = 
cfs.select(View.select(SSTableSet.LIVE, pk));
+            for (SSTableReader sstable : view.sstables)
+            {
+                if (!sstable.mayContainAssumingKeyIsInRange(pk))
+                    continue;
 
-        long descriptor = LongType.instance.compose(ByteBuffer.wrap((byte[]) 
row.clustering().get(0)));
-        int position = Int32Type.instance.compose(ByteBuffer.wrap((byte[]) 
row.clustering().get(1)));
-        into.key = key;
-        into.value = row.getCell(recordColumn).buffer();
-        into.userVersion = 
Int32Type.instance.compose(row.getCell(versionColumn).buffer());
+                UnfilteredRowIterator iter = 
StorageHook.instance.makeRowIterator(cfs, sstable, pk, Slices.ALL, 
ColumnFilter.all(cfs.metadata()), false, NOOP_LISTENER);
+                if (iter.getClass() != 
EmptyIterators.EmptyUnfilteredRowIterator.class)
+                    iters.add(iter);
+            }
 
-        onEntry.accept(descriptor, position, into.key, into.value, 
into.userVersion);
+            return new TableKeyIterator(key, iters, iters.isEmpty() ? null : 
UnfilteredRowIterators.merge(iters), readOrder);
+        }
+        catch (Throwable t)
+        {
+            readOrder.close();
+            for (UnfilteredRowIterator iter : iters)
+            {
+                try { iter.close(); }
+                catch (Throwable t2) { t.addSuppressed(t2); }
+            }
+            throw t;
+        }
     }
 
     @SuppressWarnings("resource") // Auto-closeable iterator will release 
related resources
diff --git a/src/java/org/apache/cassandra/tools/StandaloneJournalUtil.java 
b/src/java/org/apache/cassandra/tools/StandaloneJournalUtil.java
index 83db8e23ce..93132c5467 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneJournalUtil.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneJournalUtil.java
@@ -34,6 +34,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import accord.utils.Invariants;
 import org.apache.cassandra.config.AccordSpec;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.DurationSpec;
@@ -351,4 +352,55 @@ public class StandaloneJournalUtil implements Runnable
             }
         }
     }
+
+    @Command(name = "load", description = "Load item from journal")
+    public static class Load implements Runnable
+    {
+        @Option(names = {"-s", "--sstables"}, description = "Path to sstables")
+        public String sstables;
+
+        @Option(names = {"-j", "--journal-segments"}, description = "Path to 
journal segments")
+        public String journalSegments;
+
+        @Option(names = {"-k", "--kind"}, description = "Kind to filter by")
+        public String kind;
+
+        @Option(names = {"-c", "--command-store-id"}, description = "Command 
Store id")
+        public String commandStoreId;
+
+        public void run()
+        {
+            if (sstables == null && journalSegments == null)
+                throw new IllegalArgumentException("Either --sstables or 
--journal-segments must be provided");
+
+            if (journalSegments == null)
+            {
+                try
+                {
+                    journalSegments = 
Files.createTempDirectory("dump_journal").getFileName().toString();
+                }
+                catch (IOException e)
+                {
+                    throw new RuntimeException(e);
+                }
+            }
+
+            setAccordJournalDirectory(journalSegments);
+            Keyspace.setInitialized();
+            AccordJournal journal = new AccordJournal(new 
AccordSpec.JournalSpec().setFlushPeriod(new 
DurationSpec.IntMillisecondsBound("1500ms")), new 
File(journalSegments).parent(), 
Keyspace.open(SchemaConstants.ACCORD_KEYSPACE_NAME).getColumnFamilyStore(AccordKeyspace.JOURNAL));
+
+            Keyspace ks = Schema.instance.getKeyspaceInstance("system_accord");
+            ColumnFamilyStore cfs = ks.getColumnFamilyStore("journal");
+            if (sstables != null)
+                cfs.importNewSSTables(Collections.singleton(sstables), false, 
false, false, false, false, false, true);
+
+            journal.start(null);
+            if 
(kind.toString().equals(JournalKey.Type.REDUNDANT_BEFORE.toString()))
+            {
+                Invariants.require(commandStoreId != null);
+                int commandStoreId = Integer.parseInt(this.commandStoreId);
+                
output.out.println(journal.loadRedundantBefore(commandStoreId));
+            }
+        }
+    }
 }
diff --git a/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java 
b/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java
index 0ca992b642..2c840d4a7d 100644
--- a/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java
+++ b/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java
@@ -27,7 +27,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 
-import ch.qos.logback.classic.LoggerContext;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableSet;
 
@@ -274,8 +273,6 @@ public final class JVMStabilityInspector
 
             if (doExit && killing.compareAndSet(false, true))
             {
-                if (LoggerFactory.getILoggerFactory() instanceof LoggerContext)
-                    ((LoggerContext) LoggerFactory.getILoggerFactory()).stop();
                 StorageService.instance.removeShutdownHook();
                 System.exit(100);
             }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to