This is an automated email from the ASF dual-hosted git repository. ifesdjeen pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new 9ed9c7cacd Iterate entries in reverse order, as we can not rely on switch atomicity 9ed9c7cacd is described below commit 9ed9c7cacd3faac8af83b9f45d18f6c3fdc6d58c Author: Alex Petrov <oleksandr.pet...@gmail.com> AuthorDate: Thu Aug 7 13:46:06 2025 +0200 Iterate entries in reverse order, as we can not rely on switch atomicity Patch by Alex Petrov; reviewed by Benedict Elliott Smith for CASSANDRA-20837 --- .../service/accord/AccordJournalTable.java | 200 ++++++++++----------- .../cassandra/tools/StandaloneJournalUtil.java | 52 ++++++ .../cassandra/utils/JVMStabilityInspector.java | 3 - 3 files changed, 147 insertions(+), 108 deletions(-) diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java b/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java index 0e8cff3bb5..3c97b02dcd 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java +++ b/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java @@ -36,7 +36,6 @@ import accord.primitives.Timestamp; import accord.primitives.TxnId; import accord.utils.Invariants; import accord.utils.UncheckedInterruptedException; -import org.agrona.collections.LongHashSet; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.ColumnIdentifier; import org.apache.cassandra.cql3.Operator; @@ -61,7 +60,6 @@ import org.apache.cassandra.db.marshal.LongType; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; import org.apache.cassandra.db.rows.Row; -import org.apache.cassandra.db.rows.Unfiltered; import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.db.rows.UnfilteredRowIterators; import org.apache.cassandra.dht.Bounds; @@ -73,7 +71,6 @@ import org.apache.cassandra.io.sstable.ISSTableScanner; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataInputPlus; -import org.apache.cassandra.journal.EntrySerializer.EntryHolder; import org.apache.cassandra.journal.Journal; import org.apache.cassandra.journal.KeySupport; import org.apache.cassandra.journal.RecordConsumer; @@ -83,10 +80,12 @@ import org.apache.cassandra.service.accord.AccordKeyspace.JournalColumns; import org.apache.cassandra.service.accord.api.TokenKey; import org.apache.cassandra.service.accord.serializers.CommandSerializers; import org.apache.cassandra.service.accord.serializers.Version; +import org.apache.cassandra.utils.Closeable; import org.apache.cassandra.utils.CloseableIterator; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.MergeIterator; +import org.apache.cassandra.utils.concurrent.OpOrder; import static org.apache.cassandra.io.sstable.SSTableReadsListener.NOOP_LISTENER; import static org.apache.cassandra.service.accord.AccordKeyspace.JournalColumns.getJournalKey; @@ -221,63 +220,6 @@ public class AccordJournalTable<K extends JournalKey, V> implements RangeSearche } } - // TODO (expected): this can be removed entirely when we "flush" segments directly to sstables (but we perhaps need to be careful about the active segment) - private class TableRecordConsumer implements RecordConsumer<K> - { - final LongHashSet visited; - final RecordConsumer<K> delegate; - - TableRecordConsumer(LongHashSet visited, RecordConsumer<K> delegate) - { - this.visited = visited; - this.delegate = delegate; - } - - boolean visited(long segment) - { - return visited != null && visited.contains(segment); - } - - @Override - public void accept(long segment, int position, K key, ByteBuffer buffer, int userVersion) - { - if (!visited(segment)) - delegate.accept(segment, position, key, buffer, userVersion); - } - } - - private class JournalAndTableRecordConsumer implements RecordConsumer<K> - { - private final K key; - private final RecordConsumer<K> delegate; - private LongHashSet visited; - - void visit(long segment) - { - if (visited == null) - visited = new LongHashSet(); - visited.add(segment); - } - - JournalAndTableRecordConsumer(K key, RecordConsumer<K> reader) - { - this.key = key; - this.delegate = reader; - } - - void readTable() - { - readAllFromTable(key, new TableRecordConsumer(visited, delegate)); - } - - @Override - public void accept(long segment, int position, K key, ByteBuffer buffer, int userVersion) - { - visit(segment); - delegate.accept(segment, position, key, buffer, userVersion); - } - } - /** * When using {@link PartitionRangeReadCommand} we need to work with {@link RowFilter} which works with columns. * But the index doesn't care about table based queries and needs to be queried using the fields in the index, to @@ -405,71 +347,119 @@ public class AccordJournalTable<K extends JournalKey, V> implements RangeSearche */ public void readAll(K key, Reader reader) { - readAll(key, new RecordConsumerAdapter(reader)); + readAll(key, new RecordConsumerAdapter<>(reader)); } public void readAll(K key, RecordConsumer<K> reader) { - JournalAndTableRecordConsumer consumer = new JournalAndTableRecordConsumer(key, reader); - journal.readAll(key, consumer); - consumer.readTable(); + try (TableKeyIterator table = readAllFromTable(key)) + { + boolean hasTableData = table.advance(); + long minSegment = hasTableData ? table.segment : Long.MIN_VALUE; + // First, read all journal entries newer than anything flushed into sstables + journal.readAll(key, (segment, position, key1, buffer, userVersion) -> { + if (segment > minSegment) + reader.accept(segment, position, key1, buffer, userVersion); + }); + + // Then, read SSTables + while (hasTableData) + { + reader.accept(table.segment, table.offset, key, table.value, table.userVersion); + hasTableData = table.advance(); + } + } } - - private void readAllFromTable(K key, TableRecordConsumer onEntry) + + // TODO (expected): why are recordColumn and versionColumn instance fields, so that this cannot be a static class? + class TableKeyIterator implements Closeable, RecordConsumer<K> { - DecoratedKey pk = JournalColumns.decorate(key); - try (RefViewFragment view = cfs.selectAndReference(View.select(SSTableSet.LIVE, pk))) + final K key; + final List<UnfilteredRowIterator> unmerged; + final UnfilteredRowIterator merged; + final OpOrder.Group readOrder; + + long segment; + int offset; + ByteBuffer value; + int userVersion; + + TableKeyIterator(K key, List<UnfilteredRowIterator> unmerged, UnfilteredRowIterator merged, OpOrder.Group readOrder) { - if (view.sstables.isEmpty()) - return; + this.key = key; + this.unmerged = unmerged; + this.merged = merged; + this.readOrder = readOrder; + } - List<UnfilteredRowIterator> iters = new ArrayList<>(Math.min(4, view.sstables.size())); - try - { - for (SSTableReader sstable : view.sstables) - { - if (!sstable.mayContainAssumingKeyIsInRange(pk)) - continue; + @Override + public void accept(long segment, int offset, K key, ByteBuffer buffer, int userVersion) + { + this.segment = segment; + this.offset = offset; + this.value = buffer; + this.userVersion = userVersion; + } - UnfilteredRowIterator iter = StorageHook.instance.makeRowIterator(cfs, sstable, pk, Slices.ALL, ColumnFilter.all(cfs.metadata()), false, NOOP_LISTENER); - if (iter.getClass() != EmptyIterators.EmptyUnfilteredRowIterator.class) - iters.add(iter); - } + boolean advance() + { + if (merged == null || !merged.hasNext()) + return false; - if (!iters.isEmpty()) - { - EntryHolder<K> into = new EntryHolder<>(); - try (UnfilteredRowIterator iter = UnfilteredRowIterators.merge(iters)) - { - while (iter.hasNext()) readRow(key, iter.next(), into, onEntry); - } - } + try + { + Row row = (Row) merged.next(); + segment = LongType.instance.compose(ByteBuffer.wrap((byte[]) row.clustering().get(0))); + offset = Int32Type.instance.compose(ByteBuffer.wrap((byte[]) row.clustering().get(1))); + value = row.getCell(recordColumn).buffer(); + userVersion = Int32Type.instance.compose(row.getCell(versionColumn).buffer()); + return true; } catch (Throwable t) { - String message = "Failed to read from " + iters; - for (UnfilteredRowIterator iter : iters) - { - try { iter.close(); } - catch (Throwable t2) { t.addSuppressed(t2); } - } - throw new FSReadError(message, t); + throw new FSReadError("Failed to read from " + unmerged, t); } } + + @Override + public void close() + { + readOrder.close(); + if (merged != null) + merged.close(); + } } - private void readRow(K key, Unfiltered unfiltered, EntryHolder<K> into, RecordConsumer<K> onEntry) + private TableKeyIterator readAllFromTable(K key) { - Invariants.require(unfiltered.isRow()); - Row row = (Row) unfiltered; + DecoratedKey pk = JournalColumns.decorate(key); + OpOrder.Group readOrder = cfs.readOrdering.start(); + List<UnfilteredRowIterator> iters = new ArrayList<>(3); + try + { + ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, pk)); + for (SSTableReader sstable : view.sstables) + { + if (!sstable.mayContainAssumingKeyIsInRange(pk)) + continue; - long descriptor = LongType.instance.compose(ByteBuffer.wrap((byte[]) row.clustering().get(0))); - int position = Int32Type.instance.compose(ByteBuffer.wrap((byte[]) row.clustering().get(1))); - into.key = key; - into.value = row.getCell(recordColumn).buffer(); - into.userVersion = Int32Type.instance.compose(row.getCell(versionColumn).buffer()); + UnfilteredRowIterator iter = StorageHook.instance.makeRowIterator(cfs, sstable, pk, Slices.ALL, ColumnFilter.all(cfs.metadata()), false, NOOP_LISTENER); + if (iter.getClass() != EmptyIterators.EmptyUnfilteredRowIterator.class) + iters.add(iter); + } - onEntry.accept(descriptor, position, into.key, into.value, into.userVersion); + return new TableKeyIterator(key, iters, iters.isEmpty() ? null : UnfilteredRowIterators.merge(iters), readOrder); + } + catch (Throwable t) + { + readOrder.close(); + for (UnfilteredRowIterator iter : iters) + { + try { iter.close(); } + catch (Throwable t2) { t.addSuppressed(t2); } + } + throw t; + } } @SuppressWarnings("resource") // Auto-closeable iterator will release related resources diff --git a/src/java/org/apache/cassandra/tools/StandaloneJournalUtil.java b/src/java/org/apache/cassandra/tools/StandaloneJournalUtil.java index 83db8e23ce..93132c5467 100644 --- a/src/java/org/apache/cassandra/tools/StandaloneJournalUtil.java +++ b/src/java/org/apache/cassandra/tools/StandaloneJournalUtil.java @@ -34,6 +34,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import accord.utils.Invariants; import org.apache.cassandra.config.AccordSpec; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.DurationSpec; @@ -351,4 +352,55 @@ public class StandaloneJournalUtil implements Runnable } } } + + @Command(name = "load", description = "Load item from journal") + public static class Load implements Runnable + { + @Option(names = {"-s", "--sstables"}, description = "Path to sstables") + public String sstables; + + @Option(names = {"-j", "--journal-segments"}, description = "Path to journal segments") + public String journalSegments; + + @Option(names = {"-k", "--kind"}, description = "Kind to filter by") + public String kind; + + @Option(names = {"-c", "--command-store-id"}, description = "Command Store id") + public String commandStoreId; + + public void run() + { + if (sstables == null && journalSegments == null) + throw new IllegalArgumentException("Either --sstables or --journal-segments must be provided"); + + if (journalSegments == null) + { + try + { + journalSegments = Files.createTempDirectory("dump_journal").getFileName().toString(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + + setAccordJournalDirectory(journalSegments); + Keyspace.setInitialized(); + AccordJournal journal = new AccordJournal(new AccordSpec.JournalSpec().setFlushPeriod(new DurationSpec.IntMillisecondsBound("1500ms")), new File(journalSegments).parent(), Keyspace.open(SchemaConstants.ACCORD_KEYSPACE_NAME).getColumnFamilyStore(AccordKeyspace.JOURNAL)); + + Keyspace ks = Schema.instance.getKeyspaceInstance("system_accord"); + ColumnFamilyStore cfs = ks.getColumnFamilyStore("journal"); + if (sstables != null) + cfs.importNewSSTables(Collections.singleton(sstables), false, false, false, false, false, false, true); + + journal.start(null); + if (kind.toString().equals(JournalKey.Type.REDUNDANT_BEFORE.toString())) + { + Invariants.require(commandStoreId != null); + int commandStoreId = Integer.parseInt(this.commandStoreId); + output.out.println(journal.loadRedundantBefore(commandStoreId)); + } + } + } } diff --git a/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java b/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java index 0ca992b642..2c840d4a7d 100644 --- a/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java +++ b/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java @@ -27,7 +27,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; -import ch.qos.logback.classic.LoggerContext; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSet; @@ -274,8 +273,6 @@ public final class JVMStabilityInspector if (doExit && killing.compareAndSet(false, true)) { - if (LoggerFactory.getILoggerFactory() instanceof LoggerContext) - ((LoggerContext) LoggerFactory.getILoggerFactory()).stop(); StorageService.instance.removeShutdownHook(); System.exit(100); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org