This is an automated email from the ASF dual-hosted git repository. ifesdjeen pushed a commit to branch cep-15-accord in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 42f57fb3dc54f72616f3c444ed1a0cc5f4240366 Author: Alex Petrov <[email protected]> AuthorDate: Wed Jan 22 13:33:19 2025 +0100 Avoid double loading in Cassandra side of journal; make sure to include records appended to journal. Patch by Alex Petrov; reviewed by Benedict Elliott Smith for CASSANDRA-20244 --- modules/accord | 2 +- src/java/org/apache/cassandra/journal/Journal.java | 172 ++++++++++++++++----- .../org/apache/cassandra/journal/OnDiskIndex.java | 12 +- .../org/apache/cassandra/journal/Segments.java | 6 + .../service/accord/AccordCommandStore.java | 58 ++----- .../service/accord/AccordCommandStores.java | 2 +- .../cassandra/service/accord/AccordJournal.java | 89 +++-------- .../service/accord/AccordJournalTable.java | 150 ++++++------------ .../org/apache/cassandra/journal/IndexTest.java | 4 +- .../apache/cassandra/utils/AccordGenerators.java | 2 + 10 files changed, 237 insertions(+), 260 deletions(-) diff --git a/modules/accord b/modules/accord index cd7f49564a..96ec342423 160000 --- a/modules/accord +++ b/modules/accord @@ -1 +1 @@ -Subproject commit cd7f49564a5ad053286453d10f8cd46b8d870c4f +Subproject commit 96ec3424230bedf40b834785014a366716df8f42 diff --git a/src/java/org/apache/cassandra/journal/Journal.java b/src/java/org/apache/cassandra/journal/Journal.java index cd7bb53d07..6f0fdabfd3 100644 --- a/src/java/org/apache/cassandra/journal/Journal.java +++ b/src/java/org/apache/cassandra/journal/Journal.java @@ -17,20 +17,21 @@ */ package org.apache.cassandra.journal; -import java.io.Closeable; import java.io.IOException; import java.nio.channels.ClosedByInterruptException; import java.nio.file.FileStore; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; +import java.util.Iterator; import java.util.List; -import java.util.PriorityQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.LockSupport; import java.util.function.BooleanSupplier; import java.util.function.Function; +import java.util.function.LongConsumer; import java.util.function.Predicate; import java.util.zip.CRC32; @@ -51,8 +52,11 @@ import org.apache.cassandra.io.util.File; import org.apache.cassandra.io.util.PathUtils; import org.apache.cassandra.journal.Segments.ReferencedSegments; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.CloseableIterator; import org.apache.cassandra.utils.Crc; import org.apache.cassandra.utils.JVMStabilityInspector; +import org.apache.cassandra.utils.LazyToString; +import org.apache.cassandra.utils.MergeIterator; import org.apache.cassandra.utils.Simulate; import org.apache.cassandra.utils.concurrent.OpOrder; import org.apache.cassandra.utils.concurrent.WaitQueue; @@ -262,11 +266,11 @@ public class Journal<K, V> implements Shutdownable compactor.shutdown(); compactor.awaitTermination(1, TimeUnit.MINUTES); flusher.shutdown(); - closer.shutdown(); + closeAllSegments(); releaser.shutdown(); + closer.shutdown(); closer.awaitTermination(1, TimeUnit.MINUTES); releaser.awaitTermination(1, TimeUnit.MINUTES); - closeAllSegments(); metrics.deregister(); Invariants.require(state.compareAndSet(State.SHUTDOWN, State.TERMINATED), "Unexpected journal state while trying to shut down", state); @@ -921,64 +925,156 @@ public class Journal<K, V> implements Shutdownable void write(DataOutputPlus out, int userVersion) throws IOException; } - public StaticSegmentIterator staticSegmentIterator() + /** + * Static segment iterator iterates all keys in _static_ segments in order. + */ + public StaticSegmentKeyIterator staticSegmentKeyIterator() { - return new StaticSegmentIterator(); + return new StaticSegmentKeyIterator(); } /** - * Static segment iterator iterates all _static_ segments in _key_ order. + * List of key and a list of segment descriptors referencing this key */ - public class StaticSegmentIterator implements Closeable + public static class KeyRefs<K> { - // TODO (expected): use MergeIterator - private final PriorityQueue<StaticSegment.KeyOrderReader<K>> readers; - private final ReferencedSegments<K, V> segments; + long segments[]; + K key; + int size; - private StaticSegmentIterator() + public KeyRefs(K key) { - this.segments = selectAndReference(Segment::isStatic); - this.readers = new PriorityQueue<>(); - for (Segment<K, V> segment : this.segments.all()) - { - StaticSegment<K, V> staticSegment = (StaticSegment<K, V>)segment; - StaticSegment.KeyOrderReader<K> reader = staticSegment.keyOrderReader(); - if (reader.advance()) - this.readers.add(reader); - else - reader.close(); - } + this.key = key; + } + + private KeyRefs(int maxSize) + { + this.segments = new long[maxSize]; + } + + public void segments(LongConsumer consumer) + { + for (int i = 0; i < size; i++) + consumer.accept(segments[i]); } public K key() { - StaticSegment.KeyOrderReader<K> reader = readers.peek(); - if (reader == null) - return null; - return reader.key(); + return key; + } + + private void add(K key, long segment) + { + this.key = key; + if (size == 0 || segments[size - 1] < segment) + segments[size++] = segment; + else + Invariants.require(segments[size - 1] == segment, + "Tried to add an out-of-order segment: %d, %s", segment, + LazyToString.lazy(() -> Arrays.toString(Arrays.copyOf(segments, size)))); } - public void readAllForKey(K key, RecordConsumer<K> reader) + private void reset() + { + key = null; + size = 0; + Arrays.fill(segments, 0); + } + } + + public class StaticSegmentKeyIterator implements CloseableIterator<KeyRefs<K>> + { + private final ReferencedSegments<K, V> segments; + private final MergeIterator<Head, KeyRefs<K>> iterator; + + public StaticSegmentKeyIterator() { - while (true) + this.segments = selectAndReference(Segment::isStatic); + List<Iterator<Head>> iterators = new ArrayList<>(segments.count()); + + for (Segment<K, V> segment : segments.allSorted(true)) { - StaticSegment.KeyOrderReader<K> next = readers.peek(); - if (next == null || !next.key().equals(key)) - break; - Invariants.require(next == readers.poll()); + StaticSegment<K, V> staticSegment = (StaticSegment<K, V>) segment; + Iterator<K> iter = staticSegment.index().reader(); + Head head = new Head(staticSegment.descriptor.timestamp); + iterators.add(new Iterator<>() + { + public boolean hasNext() + { + return iter.hasNext(); + } - reader.accept(next.descriptor.timestamp, next.offset, next.key(), next.record(), next.descriptor.userVersion); - if (next.advance()) - readers.add(next); - else - next.close(); + public Head next() + { + head.key = iter.next(); + return head; + } + }); } + + this.iterator = MergeIterator.get(iterators, + (r1, r2) -> { + int keyCmp = keySupport.compare(r1.key, r2.key); + if (keyCmp != 0) + return keyCmp; + return Long.compare(r1.segment, r2.segment); + }, + new MergeIterator.Reducer<Head, KeyRefs<K>>() + { + final KeyRefs<K> ret = new KeyRefs<>(segments.count()); + + @Override + public void reduce(int idx, Head head) + { + ret.add(head.key, head.segment); + } + + @Override + protected KeyRefs<K> getReduced() + { + return ret; + } + + @Override + protected void onKeyChange() + { + ret.reset(); + super.onKeyChange(); + } + }); } + @Override public void close() { segments.close(); } + + public KeyRefs<K> peek() + { + if (iterator.hasNext()) + return iterator.peek(); + return null; + } + + @Override + public boolean hasNext() + { + return iterator.hasNext(); + } + + @Override + public KeyRefs<K> next() + { + return iterator.next(); + } + + class Head + { + final long segment; + K key; + Head(long segment) { this.segment = segment; } + } } enum State diff --git a/src/java/org/apache/cassandra/journal/OnDiskIndex.java b/src/java/org/apache/cassandra/journal/OnDiskIndex.java index 17b8a7b791..8c662d889a 100644 --- a/src/java/org/apache/cassandra/journal/OnDiskIndex.java +++ b/src/java/org/apache/cassandra/journal/OnDiskIndex.java @@ -24,7 +24,6 @@ import java.nio.file.StandardOpenOption; import java.util.Map; import java.util.NavigableMap; import java.util.zip.CRC32; - import javax.annotation.Nullable; import accord.utils.Invariants; @@ -32,6 +31,7 @@ import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.io.util.File; import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.utils.AbstractIterator; import org.apache.cassandra.utils.Crc; import static org.apache.cassandra.journal.Journal.validateCRC; @@ -244,7 +244,7 @@ final class OnDiskIndex<K> extends Index<K> return new IndexReader(); } - public class IndexReader + public class IndexReader extends AbstractIterator<K> { int idx; K key; @@ -256,10 +256,12 @@ final class OnDiskIndex<K> extends Index<K> idx = -1; } - public K key() + protected K computeNext() { - ensureAdvanced(); - return key; + if (advance()) + return key; + else + return endOfData(); } public int offset() diff --git a/src/java/org/apache/cassandra/journal/Segments.java b/src/java/org/apache/cassandra/journal/Segments.java index cb38a489ca..a2475557a6 100644 --- a/src/java/org/apache/cassandra/journal/Segments.java +++ b/src/java/org/apache/cassandra/journal/Segments.java @@ -206,6 +206,12 @@ class Segments<K, V> this.refs = refs; } + public int count() + { + if (refs == null) return 0; + else return refs.size(); + } + @Override public void close() { diff --git a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java index c87cd41baf..8086cecdbc 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java +++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java @@ -45,14 +45,12 @@ import accord.local.Command; import accord.local.CommandStore; import accord.local.CommandStores; import accord.local.Commands; -import accord.local.KeyHistory; import accord.local.NodeCommandStoreService; import accord.local.PreLoadContext; import accord.local.RedundantBefore; import accord.local.SafeCommandStore; import accord.local.cfk.CommandsForKey; import accord.primitives.PartialTxn; -import accord.primitives.Participants; import accord.primitives.RangeDeps; import accord.primitives.Ranges; import accord.primitives.RoutableKey; @@ -70,11 +68,6 @@ import static accord.api.Journal.CommandUpdate; import static accord.api.Journal.FieldUpdates; import static accord.api.Journal.Load.MINIMAL; import static accord.api.Journal.Loader; -import static accord.api.Journal.OnDone; -import static accord.local.KeyHistory.SYNC; -import static accord.primitives.Status.Committed; -import static accord.primitives.Status.PreCommitted; -import static accord.primitives.Status.Truncated; import static accord.utils.Invariants.require; public class AccordCommandStore extends CommandStore @@ -188,9 +181,9 @@ public class AccordCommandStore extends CommandStore maybeLoadRangesForEpoch(journal.loadRangesForEpoch(id())); } - static Factory factory(Journal journal, IntFunction<AccordExecutor> executorFactory) + static Factory factory(IntFunction<AccordExecutor> executorFactory) { - return (id, node, agent, dataStore, progressLogFactory, listenerFactory, rangesForEpoch) -> + return (id, node, agent, dataStore, progressLogFactory, listenerFactory, rangesForEpoch, journal) -> new AccordCommandStore(id, node, agent, dataStore, progressLogFactory, listenerFactory, rangesForEpoch, journal, executorFactory.apply(id)); } @@ -489,48 +482,15 @@ public class AccordCommandStore extends CommandStore this.store = store; } - private PreLoadContext context(Command command, KeyHistory keyHistory) - { - TxnId txnId = command.txnId(); - Participants<?> keys = null; - if (CommandsForKey.manages(txnId)) - keys = command.hasBeen(Committed) ? command.participants().hasTouched() : command.participants().touches(); - else if (!CommandsForKey.managesExecution(txnId) && command.hasBeen(PreCommitted) && !command.hasBeen(Truncated)) - keys = command.asCommitted().waitingOn.keys; - - if (keys != null) - return PreLoadContext.contextFor(txnId, keys, keyHistory); - - return txnId; - } - - @Override - public void load(Command command, OnDone onDone) - { - store.execute(context(command, SYNC), safeStore -> loadInternal(command, safeStore)) - .begin((unused, throwable) -> { - if (throwable != null) - onDone.failure(throwable); - else - onDone.success(); - }); - } - @Override - public void apply(Command command, OnDone onDone) + public AsyncChain<Command> load(TxnId txnId) { - PreLoadContext context = context(command, SYNC); - store.execute(context, safeStore -> { - applyWrites(command.txnId(), safeStore, (safeCommand, cmd) -> { - Commands.applyWrites(safeStore, context, cmd).begin(store.agent); - }); - }) - .begin((unused, throwable) -> { - if (throwable != null) - onDone.failure(throwable); - else - onDone.success(); - }); + return store.submit(txnId, safeStore -> { + maybeApplyWrites(txnId, safeStore, (safeCommand, cmd) -> { + Commands.applyWrites(safeStore, txnId, cmd).begin(store.agent); + }); + return safeStore.unsafeGet(txnId).current(); + }); } } diff --git a/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java b/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java index 7188628772..37beb2c4fb 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java +++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java @@ -71,7 +71,7 @@ public class AccordCommandStores extends CommandStores implements CacheSize Journal journal, AccordExecutor[] executors) { super(node, agent, store, random, journal, shardDistributor, progressLogFactory, listenerFactory, - AccordCommandStore.factory(journal, id -> executors[id % executors.length])); + AccordCommandStore.factory(id -> executors[id % executors.length])); this.executors = executors; this.cacheSizeMetrics = new CacheSizeMetrics(ACCORD_STATE_CACHE, this); cacheSize = DatabaseDescriptor.getAccordCacheSizeInMiB() << 20; diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java b/src/java/org/apache/cassandra/service/accord/AccordJournal.java index 9be8739416..fd435a90d7 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java +++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java @@ -25,7 +25,6 @@ import java.util.Iterator; import java.util.NavigableMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.function.BiConsumer; import javax.annotation.Nullable; import com.google.common.annotations.VisibleForTesting; @@ -48,13 +47,13 @@ import accord.primitives.Timestamp; import accord.primitives.TxnId; import accord.utils.Invariants; import accord.utils.PersistentField; +import accord.utils.async.AsyncChains; import accord.utils.async.AsyncResult; import accord.utils.async.AsyncResults; import org.apache.cassandra.concurrent.Shutdownable; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.io.util.DataOutputPlus; @@ -74,13 +73,13 @@ import org.apache.cassandra.service.accord.serializers.CommandSerializers.Execut import org.apache.cassandra.service.accord.serializers.DepsSerializers; import org.apache.cassandra.service.accord.serializers.ResultSerializers; import org.apache.cassandra.service.accord.serializers.WaitingOnSerializer; +import org.apache.cassandra.utils.CloseableIterator; import org.apache.cassandra.utils.ExecutorUtils; -import org.apache.cassandra.utils.concurrent.AsyncPromise; import static accord.impl.CommandChange.anyFieldChanged; -import static accord.impl.CommandChange.isNull; import static accord.impl.CommandChange.getFlags; import static accord.impl.CommandChange.isChanged; +import static accord.impl.CommandChange.isNull; import static accord.impl.CommandChange.nextSetField; import static accord.impl.CommandChange.setChanged; import static accord.impl.CommandChange.setFieldIsNull; @@ -90,7 +89,6 @@ import static accord.impl.CommandChange.validateFlags; import static accord.local.Cleanup.Input.FULL; import static accord.primitives.SaveStatus.Erased; import static accord.primitives.SaveStatus.Vestigial; -import static accord.primitives.Status.Truncated; import static org.apache.cassandra.service.accord.AccordJournalValueSerializers.DurableBeforeAccumulator; public class AccordJournal implements accord.api.Journal, RangeSearcher.Supplier, Shutdownable @@ -439,55 +437,34 @@ public class AccordJournal implements accord.api.Journal, RangeSearcher.Supplier journalTable.forceCompaction(); } - @Override + @SuppressWarnings("unchecked") @Override public void replay(CommandStores commandStores) { journal.closeCurrentSegmentForTestingIfNonEmpty(); - try (AccordJournalTable.KeyOrderIterator<JournalKey> iter = journalTable.readAll()) + try (CloseableIterator<Journal.KeyRefs<JournalKey>> iter = journalTable.keyIterator()) { - JournalKey key; - Builder builder = new Builder(); - - while ((key = iter.key()) != null) + while (iter.hasNext()) { - builder.reset(key.id); - if (key.type != JournalKey.Type.COMMAND_DIFF) - { - // TODO (required): add "skip" for the key to avoid getting stuck - iter.readAllForKey(key, (segment, position, key1, buffer, userVersion) -> {}); + Journal.KeyRefs<JournalKey> ref = iter.next(); + + if (ref.key().type != JournalKey.Type.COMMAND_DIFF) continue; - } - JournalKey finalKey = key; - iter.readAllForKey(key, (segment, position, local, buffer, userVersion) -> { - Invariants.require(finalKey.equals(local)); - try (DataInputBuffer in = new DataInputBuffer(buffer, false)) - { - builder.deserializeNext(in, userVersion); - if (journalTable.shouldIndex(finalKey) - && builder.participants() != null - && builder.participants().route() != null) - journalTable.safeNotify(index -> - index.update(segment, finalKey.commandStoreId, finalKey.id, builder.participants().route())); - } - catch (IOException e) - { - // can only throw if serializer is buggy - throw new RuntimeException(e); - } - }); - - if (!builder.isEmpty()) - { - CommandStore commandStore = commandStores.forId(key.commandStoreId); - Command command = builder.construct(commandStore.unsafeGetRedundantBefore()); - Invariants.require(command.saveStatus() != SaveStatus.Uninitialised, - "Found uninitialized command in the log: %s %s", command.toString(), builder.toString()); - Loader loader = commandStore.loader(); - async(loader::load, command).get(); - if (command.saveStatus().compareTo(SaveStatus.Stable) >= 0 && !command.hasBeen(Truncated)) - async(loader::apply, command).get(); - } + CommandStore commandStore = commandStores.forId(ref.key().commandStoreId); + Loader loader = commandStore.loader(); + AsyncChains.getUnchecked(loader.load(ref.key().id) + .map(command -> { + if (journalTable.shouldIndex(ref.key()) + && command.participants() != null + && command.participants().route() != null) + { + ref.segments(segment -> { + journalTable.safeNotify(index -> index.update(segment, ref.key().commandStoreId, ref.key().id, command.participants().route())); + }); + } + return command; + }) + .beginAsResult()); } } catch (Throwable t) @@ -496,24 +473,6 @@ public class AccordJournal implements accord.api.Journal, RangeSearcher.Supplier } } - private AsyncPromise<?> async(BiConsumer<Command, OnDone> consumer, Command command) - { - AsyncPromise<?> future = new AsyncPromise<>(); - consumer.accept(command, new OnDone() - { - public void success() - { - future.setSuccess(null); - } - - public void failure(Throwable t) - { - future.setFailure(t); - } - }); - return future; - } - public static @Nullable ByteBuffer asSerializedChange(Command before, Command after, int userVersion) throws IOException { try (DataOutputBuffer out = new DataOutputBuffer()) diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java b/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java index 95267a6b79..f4c96fa122 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java +++ b/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java @@ -17,16 +17,16 @@ */ package org.apache.cassandra.service.accord; -import java.io.Closeable; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.function.Consumer; - +import javax.annotation.CheckForNull; import javax.annotation.Nullable; +import com.google.common.collect.AbstractIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -153,11 +153,11 @@ public class AccordJournalTable<K extends JournalKey, V> implements RangeSearche void read(DataInputPlus input, int userVersion) throws IOException; } - private abstract class AbstractRecordConsumer implements RecordConsumer<K> + private class RecordConsumerAdapter implements RecordConsumer<K> { protected final Reader reader; - AbstractRecordConsumer(Reader reader) + RecordConsumerAdapter(Reader reader) { this.reader = reader; } @@ -169,13 +169,13 @@ public class AccordJournalTable<K extends JournalKey, V> implements RangeSearche } } - private class TableRecordConsumer extends AbstractRecordConsumer + private class TableRecordConsumer implements RecordConsumer<K> { protected LongHashSet visited = null; - - TableRecordConsumer(Reader reader) + protected RecordConsumer<K> delegate; + TableRecordConsumer(RecordConsumer<K> delegate) { - super(reader); + this.delegate = delegate; } void visit(long segment) @@ -194,20 +194,20 @@ public class AccordJournalTable<K extends JournalKey, V> implements RangeSearche public void accept(long segment, int position, K key, ByteBuffer buffer, int userVersion) { visit(segment); - super.accept(segment, position, key, buffer, userVersion); + delegate.accept(segment, position, key, buffer, userVersion); } } - private class JournalAndTableRecordConsumer extends AbstractRecordConsumer + private class JournalAndTableRecordConsumer implements RecordConsumer<K> { private final K key; private final TableRecordConsumer tableRecordConsumer; - - JournalAndTableRecordConsumer(K key, Reader reader) + private final RecordConsumer<K> delegate; + JournalAndTableRecordConsumer(K key, RecordConsumer<K> reader) { - super(reader); this.key = key; this.tableRecordConsumer = new TableRecordConsumer(reader); + this.delegate = reader; } void readTable() @@ -219,7 +219,7 @@ public class AccordJournalTable<K extends JournalKey, V> implements RangeSearche public void accept(long segment, int position, K key, ByteBuffer buffer, int userVersion) { if (!tableRecordConsumer.visited(segment)) - super.accept(segment, position, key, buffer, userVersion); + delegate.accept(segment, position, key, buffer, userVersion); } } @@ -340,6 +340,11 @@ public class AccordJournalTable<K extends JournalKey, V> implements RangeSearche * When reading from journal segments, skip descriptors that were read from the table. */ public void readAll(K key, Reader reader, boolean asc) + { + readAll(key, new RecordConsumerAdapter(reader), asc); + } + + public void readAll(K key, RecordConsumer<K> reader, boolean asc) { JournalAndTableRecordConsumer consumer = new JournalAndTableRecordConsumer(key, reader); journal.readAll(key, consumer, asc); @@ -386,19 +391,16 @@ public class AccordJournalTable<K extends JournalKey, V> implements RangeSearche } @SuppressWarnings("resource") // Auto-closeable iterator will release related resources - public KeyOrderIterator<K> readAll() + public CloseableIterator<Journal.KeyRefs<K>> keyIterator() { return new JournalAndTableKeyIterator(); } - private class TableIterator implements Closeable + private class TableIterator extends AbstractIterator<K> implements CloseableIterator<K> { private final UnfilteredPartitionIterator mergeIterator; private final RefViewFragment view; - private UnfilteredRowIterator partition; - private LongHashSet visited = null; - private TableIterator() { view = cfs.selectAndReference(v -> v.select(SSTableSet.LIVE)); @@ -407,57 +409,24 @@ public class AccordJournalTable<K extends JournalKey, V> implements RangeSearche scanners.add(sstable.getScanner()); mergeIterator = view.sstables.isEmpty() - ? EmptyIterators.unfilteredPartition(cfs.metadata()) - : UnfilteredPartitionIterators.merge(scanners, UnfilteredPartitionIterators.MergeListener.NOOP); + ? EmptyIterators.unfilteredPartition(cfs.metadata()) + : UnfilteredPartitionIterators.merge(scanners, UnfilteredPartitionIterators.MergeListener.NOOP); } - public JournalKey key() + @CheckForNull + protected K computeNext() { - if (partition == null) + if (mergeIterator.hasNext()) { - if (mergeIterator.hasNext()) - partition = mergeIterator.next(); - else - return null; - } - - return AccordKeyspace.JournalColumns.getJournalKey(partition.partitionKey()); - } - - protected void readAllForKey(K key, RecordConsumer<K> recordConsumer) - { - while (partition.hasNext()) - { - EntryHolder<K> into = new EntryHolder<>(); - // TODO: use flyweight to avoid allocating extra lambdas? - readRow(key, partition.next(), into, (segment, position, key1, buffer, userVersion) -> { - visit(segment); - recordConsumer.accept(segment, position, key1, buffer, userVersion); - }); + try (UnfilteredRowIterator partition = mergeIterator.next()) + { + return (K) AccordKeyspace.JournalColumns.getJournalKey(partition.partitionKey()); + } } - - partition = null; + else + return endOfData(); } - void visit(long segment) - { - if (visited == null) - visited = new LongHashSet(); - visited.add(segment); - } - - boolean visited(long segment) - { - return visited != null && visited.contains(segment); - } - - - void clear() - { - visited = null; - } - - @Override public void close() { @@ -466,62 +435,45 @@ public class AccordJournalTable<K extends JournalKey, V> implements RangeSearche } } - private class JournalAndTableKeyIterator implements KeyOrderIterator<K> + private class JournalAndTableKeyIterator extends AbstractIterator<Journal.KeyRefs<K>> implements CloseableIterator<Journal.KeyRefs<K>> { final TableIterator tableIterator; - final Journal<K, V>.StaticSegmentIterator staticSegmentIterator; + final Journal<K, V>.StaticSegmentKeyIterator journalIterator; private JournalAndTableKeyIterator() { this.tableIterator = new TableIterator(); - this.staticSegmentIterator = journal.staticSegmentIterator(); + this.journalIterator = journal.staticSegmentKeyIterator(); } - @Override - public K key() + protected Journal.KeyRefs<K> computeNext() { - // TODO (expected): fix generics mismatch here - K tableKey = (K)tableIterator.key(); - K journalKey = staticSegmentIterator.key(); + K tableKey = tableIterator.hasNext() ? tableIterator.peek() : null; + Journal.KeyRefs<K> journalKey = journalIterator.hasNext() ? journalIterator.peek() : null; + if (tableKey == null) - return journalKey; - if (journalKey == null || keySupport.compare(tableKey, journalKey) > 0) - return journalKey; + return journalKey == null ? endOfData() : journalIterator.next(); - return tableKey; - } + if (journalKey == null) + return new Journal.KeyRefs<>(tableIterator.next()); - @Override - public void readAllForKey(K key, RecordConsumer<K> reader) - { - K tableKey = (K)tableIterator.key(); - K journalKey = staticSegmentIterator.key(); - if (journalKey != null && keySupport.compare(journalKey, key) == 0) - staticSegmentIterator.readAllForKey(key, (segment, position, key1, buffer, userVersion) -> { - if (!tableIterator.visited(segment)) - reader.accept(segment, position, key1, buffer, userVersion); - }); - - if (tableKey != null && keySupport.compare(tableKey, key) == 0) - tableIterator.readAllForKey(key, reader); - - tableIterator.clear(); + int cmp = keySupport.compare(tableKey, journalKey.key()); + if (cmp == 0) + { + tableIterator.next(); + return journalIterator.next(); + } + + return cmp > 0 ? new Journal.KeyRefs<>(tableIterator.next()) : journalIterator.next(); } public void close() { tableIterator.close(); - staticSegmentIterator.close(); + journalIterator.close(); } } - public interface KeyOrderIterator<K> extends Closeable - { - K key(); - void readAllForKey(K key, RecordConsumer<K> reader); - void close(); - } - public static void readBuffer(ByteBuffer buffer, Reader reader, int userVersion) { try (DataInputBuffer in = new DataInputBuffer(buffer, false)) diff --git a/test/unit/org/apache/cassandra/journal/IndexTest.java b/test/unit/org/apache/cassandra/journal/IndexTest.java index 43e2e21fa9..acde8cc69c 100644 --- a/test/unit/org/apache/cassandra/journal/IndexTest.java +++ b/test/unit/org/apache/cassandra/journal/IndexTest.java @@ -238,10 +238,10 @@ public class IndexTest OnDiskIndex<TimeUUID>.IndexReader iter = onDisk.reader(); Iterator<Pair<TimeUUID, Long>> expectedIter = sortedEntries.iterator(); - while (iter.advance()) + while (iter.hasNext()) { Pair<TimeUUID, Long> expected = expectedIter.next(); - Assert.assertEquals(iter.key(), expected.left); + Assert.assertEquals(iter.next(), expected.left); Assert.assertEquals(iter.recordSize(), Index.readSize(expected.right)); Assert.assertEquals(iter.offset(), Index.readOffset(expected.right)); } diff --git a/test/unit/org/apache/cassandra/utils/AccordGenerators.java b/test/unit/org/apache/cassandra/utils/AccordGenerators.java index 1b60d0a3b5..73e4b347ad 100644 --- a/test/unit/org/apache/cassandra/utils/AccordGenerators.java +++ b/test/unit/org/apache/cassandra/utils/AccordGenerators.java @@ -230,6 +230,8 @@ public class AccordGenerators builder.promised(promised); if (saveStatus.status.compareTo(Status.PreAccepted) > 0) builder.acceptedOrCommitted(accepted); + else + builder.acceptedOrCommitted(Ballot.ZERO); if (saveStatus.compareTo(SaveStatus.Stable) >= 0 && !saveStatus.hasBeen(Status.Truncated)) builder.waitingOn(waitingOn); if (saveStatus.hasBeen(Status.PreApplied) && !saveStatus.hasBeen(Status.Truncated)) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
