This is an automated email from the ASF dual-hosted git repository.
aleksey pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push:
new 2efa43ff3e6 Remove Journal record ownership tags functionality
2efa43ff3e6 is described below
commit 2efa43ff3e60fa81abd2f0e90ca3f42bc7c5307f
Author: Aleksey Yeschenko <[email protected]>
AuthorDate: Wed Jan 15 15:37:40 2025 +0000
Remove Journal record ownership tags functionality
patch by Aleksey Yeschenko; reviewed by Benedict Elliott Smith for
CASSANDRA-20229
---
.../apache/cassandra/journal/ActiveSegment.java | 27 +++---
.../apache/cassandra/journal/EntrySerializer.java | 64 +++++----------
src/java/org/apache/cassandra/journal/Journal.java | 35 ++++----
.../org/apache/cassandra/journal/Metadata.java | 95 ++--------------------
.../apache/cassandra/journal/RecordConsumer.java | 4 +-
src/java/org/apache/cassandra/journal/Segment.java | 4 +-
.../apache/cassandra/journal/StaticSegment.java | 9 +-
.../cassandra/service/accord/AccordJournal.java | 11 +--
.../service/accord/AccordJournalTable.java | 24 +++---
.../test/AccordJournalSimulationTest.java | 3 +-
.../org/apache/cassandra/journal/JournalTest.java | 12 ++-
.../org/apache/cassandra/journal/MetadataTest.java | 54 ++----------
.../org/apache/cassandra/journal/SegmentTest.java | 78 +++---------------
13 files changed, 99 insertions(+), 321 deletions(-)
diff --git a/src/java/org/apache/cassandra/journal/ActiveSegment.java
b/src/java/org/apache/cassandra/journal/ActiveSegment.java
index 9e7d87b6e73..fb5245de9b1 100644
--- a/src/java/org/apache/cassandra/journal/ActiveSegment.java
+++ b/src/java/org/apache/cassandra/journal/ActiveSegment.java
@@ -22,12 +22,12 @@ import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
-import java.util.*;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.locks.LockSupport;
import com.codahale.metrics.Timer;
+import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.util.*;
import org.apache.cassandra.utils.*;
import org.apache.cassandra.utils.concurrent.OpOrder;
@@ -362,9 +362,9 @@ public final class ActiveSegment<K, V> extends Segment<K, V>
*/
@SuppressWarnings({ "resource", "RedundantSuppression" }) // op group will
be closed by Allocation#write()
- Allocation allocate(int entrySize, Set<Integer> hosts)
+ Allocation allocate(int entrySize)
{
- int totalSize = totalEntrySize(hosts, entrySize);
+ int totalSize = totalEntrySize(entrySize);
OpOrder.Group opGroup = appendOrder.start();
try
{
@@ -383,11 +383,12 @@ public final class ActiveSegment<K, V> extends Segment<K,
V>
}
}
- private int totalEntrySize(Set<Integer> hosts, int recordSize)
+ private int totalEntrySize(int recordSize)
{
- return EntrySerializer.fixedEntrySize(keySupport,
descriptor.userVersion)
- + EntrySerializer.variableEntrySize(hosts.size())
- + recordSize;
+ return EntrySerializer.headerSize(keySupport, descriptor.userVersion)
+ + recordSize
+ + TypeSizes.INT_SIZE // CRC
+ ;
}
// allocate bytes in the segment, or return -1 if not enough space
@@ -437,12 +438,12 @@ public final class ActiveSegment<K, V> extends Segment<K,
V>
this.length = length;
}
- void write(K id, ByteBuffer record, Set<Integer> hosts)
+ void write(K id, ByteBuffer record)
{
try
{
- EntrySerializer.write(id, record, hosts, keySupport, buffer,
descriptor.userVersion);
- metadata.update(hosts);
+ EntrySerializer.write(id, record, keySupport, buffer,
descriptor.userVersion);
+ metadata.update();
index.update(id, start, length);
}
catch (IOException e)
@@ -456,13 +457,13 @@ public final class ActiveSegment<K, V> extends Segment<K,
V>
}
// Variant of write that does not allocate/return a record pointer
- void writeInternal(K id, ByteBuffer record, Set<Integer> hosts)
+ void writeInternal(K id, ByteBuffer record)
{
try
{
- EntrySerializer.write(id, record, hosts, keySupport, buffer,
descriptor.userVersion);
+ EntrySerializer.write(id, record, keySupport, buffer,
descriptor.userVersion);
index.update(id, start, length);
- metadata.update(hosts);
+ metadata.update();
}
catch (IOException e)
{
diff --git a/src/java/org/apache/cassandra/journal/EntrySerializer.java
b/src/java/org/apache/cassandra/journal/EntrySerializer.java
index a90cfa38f0e..d991e5088eb 100644
--- a/src/java/org/apache/cassandra/journal/EntrySerializer.java
+++ b/src/java/org/apache/cassandra/journal/EntrySerializer.java
@@ -20,11 +20,9 @@ package org.apache.cassandra.journal;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.Set;
import java.util.zip.CRC32;
import accord.utils.Invariants;
-import org.agrona.collections.IntHashSet;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.Crc;
@@ -35,7 +33,6 @@ public final class EntrySerializer
{
static <K> void write(K key,
ByteBuffer record,
- Set<Integer> hosts,
KeySupport<K> keySupport,
ByteBuffer out,
int userVersion)
@@ -43,17 +40,13 @@ public final class EntrySerializer
{
int start = out.position();
int totalSize = out.getInt() - start;
- Invariants.checkState(totalSize == out.remaining() +
TypeSizes.INT_SIZE);
- Invariants.checkState(totalSize == record.remaining() +
fixedEntrySize(keySupport, userVersion) + variableEntrySize(hosts.size()));
+ Invariants.checkState(totalSize == TypeSizes.INT_SIZE +
out.remaining());
+ Invariants.checkState(totalSize == headerSize(keySupport, userVersion)
+ record.remaining() + TypeSizes.INT_SIZE);
keySupport.serialize(key, out, userVersion);
- out.putShort((short)hosts.size());
- int fixedCrcPosition = out.position();
- out.position(fixedCrcPosition + TypeSizes.INT_SIZE);
-
- for (int host : hosts)
- out.putInt(host);
+ int headerCrcPosition = out.position();
+ out.position(headerCrcPosition + TypeSizes.INT_SIZE);
int recordSize = record.remaining();
int recordEnd = out.position() + recordSize;
@@ -63,7 +56,7 @@ public final class EntrySerializer
// update and write crcs
CRC32 crc = Crc.crc32();
out.position(start);
- out.limit(fixedCrcPosition);
+ out.limit(headerCrcPosition);
crc.update(out);
out.limit(recordEnd);
out.putInt((int) crc.getValue());
@@ -87,9 +80,9 @@ public final class EntrySerializer
Invariants.checkState(totalSize == from.remaining());
CRC32 crc = Crc.crc32();
- int fixedSize = EntrySerializer.fixedEntrySize(keySupport,
userVersion);
- int fixedCrc = readAndUpdateFixedCrc(crc, from, fixedSize);
- validateCRC(crc, fixedCrc);
+ int headerSize = EntrySerializer.headerSize(keySupport,
userVersion);
+ int headerCrc = readAndUpdateHeaderCrc(crc, from, headerSize);
+ validateCRC(crc, headerCrc);
int recordCrc = readAndUpdateRecordCrc(crc, from, start +
totalSize);
validateCRC(crc, recordCrc);
@@ -121,15 +114,15 @@ public final class EntrySerializer
return handleReadException(new EOFException(), from.limit(),
syncedOffset);
{
- int fixedSize = EntrySerializer.fixedEntrySize(keySupport,
userVersion);
- int fixedCrc = readAndUpdateFixedCrc(crc, from, fixedSize);
+ int headerSize = EntrySerializer.headerSize(keySupport,
userVersion);
+ int headerCrc = readAndUpdateHeaderCrc(crc, from, headerSize);
try
{
- validateCRC(crc, fixedCrc);
+ validateCRC(crc, headerCrc);
}
catch (IOException e)
{
- return handleReadException(e, from.position() + fixedSize,
syncedOffset);
+ return handleReadException(e, from.position() + headerSize,
syncedOffset);
}
int recordCrc = readAndUpdateRecordCrc(crc, from, start +
totalSize);
@@ -151,26 +144,18 @@ public final class EntrySerializer
{
from.position(start + TypeSizes.INT_SIZE);
into.key = keySupport.deserialize(from, userVersion);
- int hostCount = from.getShort();
-
from.position(from.position() + 4);
- for (int i = 0; i < hostCount; i++)
- {
- int hostId = from.getInt();
- into.hosts.add(hostId);
- }
-
into.value = from;
into.userVersion = userVersion;
}
- private static int readAndUpdateFixedCrc(CRC32 crc, ByteBuffer from, int
fixedSize)
+ private static int readAndUpdateHeaderCrc(CRC32 crc, ByteBuffer from, int
headerSize)
{
- int fixedEnd = from.position() + fixedSize - TypeSizes.INT_SIZE;
- int fixedCrc = from.getInt(fixedEnd);
- from.limit(fixedEnd);
+ int headerEnd = from.position() + headerSize - TypeSizes.INT_SIZE;
+ int headerCrc = from.getInt(headerEnd);
+ from.limit(headerEnd);
crc.update(from);
- return fixedCrc;
+ return headerCrc;
}
private static int readAndUpdateRecordCrc(CRC32 crc, ByteBuffer from, int
limit)
@@ -192,25 +177,17 @@ public final class EntrySerializer
return -1;
}
- static <K> int fixedEntrySize(KeySupport<K> keySupport, int userVersion)
+ static <K> int headerSize(KeySupport<K> keySupport, int userVersion)
{
- return keySupport.serializedSize(userVersion) // key/id
- + TypeSizes.SHORT_SIZE // host count
- + TypeSizes.INT_SIZE // total size
+ return TypeSizes.INT_SIZE // pointer to next entry
+ + keySupport.serializedSize(userVersion) // key/id
+ TypeSizes.INT_SIZE; // CRC
}
- static int variableEntrySize(int hostCount)
- {
- return TypeSizes.INT_SIZE * hostCount // hosts
- + TypeSizes.INT_SIZE; // CRC
- }
-
public static final class EntryHolder<K>
{
public K key;
public ByteBuffer value;
- public IntHashSet hosts = new IntHashSet();
public int userVersion;
@@ -218,7 +195,6 @@ public final class EntrySerializer
{
key = null;
value = null;
- hosts.clear();
}
}
}
diff --git a/src/java/org/apache/cassandra/journal/Journal.java
b/src/java/org/apache/cassandra/journal/Journal.java
index 19d1c1d78fa..2a875a6de21 100644
--- a/src/java/org/apache/cassandra/journal/Journal.java
+++ b/src/java/org/apache/cassandra/journal/Journal.java
@@ -25,7 +25,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.PriorityQueue;
-import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -74,9 +73,7 @@ import static
org.apache.cassandra.utils.concurrent.WaitQueue.newWaitQueue;
* A generic append-only journal with some special features:
* <p><ul>
* <li>Records can be looked up by key
- * <li>Records can be tagged with multiple owner node ids
- * <li>Records can be invalidated by their owner ids
- * <li>Fully invalidated records get purged during segment compaction
+ * <li>Invalidated records get purged during segment compaction
* </ul><p>
*
* Type parameters:
@@ -265,11 +262,11 @@ public class Journal<K, V> implements Shutdownable
compactor.shutdown();
compactor.awaitTermination(1, TimeUnit.MINUTES);
flusher.shutdown();
- closer.shutdown();
+ closeAllSegments();
releaser.shutdown();
+ closer.shutdown();
closer.awaitTermination(1, TimeUnit.MINUTES);
releaser.awaitTermination(1, TimeUnit.MINUTES);
- closeAllSegments();
metrics.deregister();
Invariants.checkState(state.compareAndSet(State.SHUTDOWN,
State.TERMINATED),
"Unexpected journal state while trying to
shut down", state);
@@ -349,7 +346,7 @@ public class Journal<K, V> implements Shutdownable
public List<V> readAll(K id)
{
List<V> res = new ArrayList<>(2);
- readAll(id, (segment, position, key, buffer, hosts, userVersion) -> {
+ readAll(id, (segment, position, key, buffer, userVersion) -> {
try (DataInputBuffer in = new DataInputBuffer(buffer, false))
{
res.add(valueSerializer.deserialize(key, in, userVersion));
@@ -448,15 +445,14 @@ public class Journal<K, V> implements Shutdownable
*
* @param id user-provided record id, expected to roughly correlate with
time and go up
* @param record the record to store
- * @param hosts hosts expected to invalidate the record
*/
- public void blockingWrite(K id, V record, Set<Integer> hosts)
+ public void blockingWrite(K id, V record)
{
try (DataOutputBuffer dob = DataOutputBuffer.scratchBuffer.get())
{
valueSerializer.serialize(id, record, dob, params.userVersion());
- ActiveSegment<K, V>.Allocation alloc = allocate(dob.getLength(),
hosts);
- alloc.writeInternal(id, dob.unsafeGetBufferAndFlip(), hosts);
+ ActiveSegment<K, V>.Allocation alloc = allocate(dob.getLength());
+ alloc.writeInternal(id, dob.unsafeGetBufferAndFlip());
flusher.flushAndAwaitDurable(alloc);
}
catch (IOException e)
@@ -474,20 +470,19 @@ public class Journal<K, V> implements Shutdownable
*
* @param id user-provided record id, expected to roughly correlate with
time and go up
* @param record the record to store
- * @param hosts hosts expected to invalidate the record
*/
- public RecordPointer asyncWrite(K id, V record, Set<Integer> hosts)
+ public RecordPointer asyncWrite(K id, V record)
{
- return asyncWrite(id, (out, userVersion) ->
valueSerializer.serialize(id, record, out, userVersion), hosts);
+ return asyncWrite(id, (out, userVersion) ->
valueSerializer.serialize(id, record, out, userVersion));
}
- public RecordPointer asyncWrite(K id, Writer writer, Set<Integer> hosts)
+ public RecordPointer asyncWrite(K id, Writer writer)
{
try (DataOutputBuffer dob = DataOutputBuffer.scratchBuffer.get())
{
writer.write(dob, params.userVersion());
- ActiveSegment<K, V>.Allocation alloc = allocate(dob.getLength(),
hosts);
- alloc.write(id, dob.unsafeGetBufferAndFlip(), hosts);
+ ActiveSegment<K, V>.Allocation alloc = allocate(dob.getLength());
+ alloc.write(id, dob.unsafeGetBufferAndFlip());
return flusher.flush(alloc);
}
catch (IOException e)
@@ -497,12 +492,12 @@ public class Journal<K, V> implements Shutdownable
}
}
- private ActiveSegment<K, V>.Allocation allocate(int entrySize,
Set<Integer> hosts)
+ private ActiveSegment<K, V>.Allocation allocate(int entrySize)
{
ActiveSegment<K, V> segment = currentSegment;
ActiveSegment<K, V>.Allocation alloc;
- while (null == (alloc = segment.allocate(entrySize, hosts)))
+ while (null == (alloc = segment.allocate(entrySize)))
{
if (entrySize >= (params.segmentSize() * 3) / 4)
throw new IllegalStateException("entrySize " + entrySize + "
too large for a segmentSize of " + params.segmentSize());
@@ -972,7 +967,7 @@ public class Journal<K, V> implements Shutdownable
break;
Invariants.checkState(next == readers.poll());
- reader.accept(next.descriptor.timestamp, next.offset,
next.key(), next.record(), next.hosts(), next.descriptor.userVersion);
+ reader.accept(next.descriptor.timestamp, next.offset,
next.key(), next.record(), next.descriptor.userVersion);
if (next.advance())
readers.add(next);
else
diff --git a/src/java/org/apache/cassandra/journal/Metadata.java
b/src/java/org/apache/cassandra/journal/Metadata.java
index fcdbe1fba2f..2742816316d 100644
--- a/src/java/org/apache/cassandra/journal/Metadata.java
+++ b/src/java/org/apache/cassandra/journal/Metadata.java
@@ -19,15 +19,9 @@ package org.apache.cassandra.journal;
import java.io.EOFException;
import java.io.IOException;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.zip.CRC32;
-import org.agrona.collections.Int2IntHashMap;
-import org.agrona.collections.IntHashSet;
import org.apache.cassandra.io.util.*;
import org.apache.cassandra.utils.Crc;
@@ -36,45 +30,32 @@ import static
org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
/**
* Tracks and serializes the following information:
- * - all the hosts with entries in the data segment and #of records each is
tagged in;
- * used for compaction prioritisation and to act in response to topology
changes
* - total count of records in this segment file
* used for compaction prioritisation
*/
final class Metadata
{
- private final Set<Integer> unmodifiableHosts;
- private final Map<Integer, Integer> recordsPerHost;
-
private int fsyncLimit;
+
private volatile int recordsCount;
private static final AtomicIntegerFieldUpdater<Metadata>
recordsCountUpdater =
AtomicIntegerFieldUpdater.newUpdater(Metadata.class, "recordsCount");
static Metadata create()
{
- return new Metadata(new ConcurrentHashMap<>(), 0);
+ return new Metadata(0);
}
- private Metadata(Map<Integer, Integer> recordsPerHost, int recordsCount)
+ private Metadata(int recordsCount)
{
- this.recordsPerHost = recordsPerHost;
this.recordsCount = recordsCount;
- this.unmodifiableHosts =
Collections.unmodifiableSet(recordsPerHost.keySet());
}
- void update(Set<Integer> hosts)
+ void update()
{
- updateHosts(hosts);
incrementRecordsCount();
}
- private void updateHosts(Set<Integer> hosts)
- {
- for (int host : hosts)
- recordsPerHost.compute(host, (k, v) -> null == v ? 1 : v + 1);
- }
-
int fsyncLimit()
{
return fsyncLimit;
@@ -85,16 +66,6 @@ final class Metadata
recordsCountUpdater.incrementAndGet(this);
}
- Set<Integer> hosts()
- {
- return unmodifiableHosts;
- }
-
- int count(int host)
- {
- return recordsPerHost.getOrDefault(host, 0);
- }
-
int totalCount()
{
return recordsCount;
@@ -103,64 +74,18 @@ final class Metadata
void write(DataOutputPlus out) throws IOException
{
CRC32 crc = Crc.crc32();
-
- /* Write records count per host */
-
- int size = recordsPerHost.size();
- out.writeInt(size);
- updateChecksumInt(crc, size);
-
- out.writeInt((int) crc.getValue());
-
- for (Map.Entry<Integer, Integer> entry : recordsPerHost.entrySet())
- {
- int host = entry.getKey();
- int count = entry.getValue();
-
- out.writeInt(host);
- out.writeInt(count);
-
- updateChecksumInt(crc, host);
- updateChecksumInt(crc, count);
- }
-
- /* Write records count */
-
out.writeInt(recordsCount);
updateChecksumInt(crc, recordsCount);
-
out.writeInt((int) crc.getValue());
}
static Metadata read(DataInputPlus in) throws IOException
{
CRC32 crc = Crc.crc32();
-
- /* Read records count per host */
-
- int size = in.readInt();
- updateChecksumInt(crc, size);
- validateCRC(crc, in.readInt());
-
- Int2IntHashMap recordsPerHost = new Int2IntHashMap(Integer.MIN_VALUE);
- for (int i = 0; i < size; i++)
- {
- int host = in.readInt();
- int count = in.readInt();
-
- updateChecksumInt(crc, host);
- updateChecksumInt(crc, count);
-
- recordsPerHost.put(host, count);
- }
-
- /* Read records count */
-
int recordsCount = in.readInt();
updateChecksumInt(crc, recordsCount);
-
validateCRC(crc, in.readInt());
- return new Metadata(recordsPerHost, recordsCount);
+ return new Metadata(recordsCount);
}
void persist(Descriptor descriptor)
@@ -195,20 +120,12 @@ final class Metadata
static <K> Metadata rebuild(Descriptor descriptor, KeySupport<K>
keySupport)
{
- Int2IntHashMap recordsPerHost = new Int2IntHashMap(Integer.MIN_VALUE);
int recordsCount = 0;
try (StaticSegment.SequentialReader<K> reader =
StaticSegment.sequentialReader(descriptor, keySupport, Integer.MAX_VALUE))
{
while (reader.advance())
- {
- // iterator is cached and reused by IntHashSet
- IntHashSet.IntIterator hosts = reader.hosts().iterator();
- while (hosts.hasNext())
- recordsPerHost.merge(hosts.nextValue(), 1, Integer::sum);
-
++recordsCount;
- }
}
catch (JournalReadError e)
{
@@ -217,7 +134,7 @@ final class Metadata
throw e;
}
- return new Metadata(recordsPerHost, recordsCount);
+ return new Metadata(recordsCount);
}
static <K> Metadata rebuildAndPersist(Descriptor descriptor, KeySupport<K>
keySupport)
diff --git a/src/java/org/apache/cassandra/journal/RecordConsumer.java
b/src/java/org/apache/cassandra/journal/RecordConsumer.java
index e16194001dd..22d2bc4e9f8 100644
--- a/src/java/org/apache/cassandra/journal/RecordConsumer.java
+++ b/src/java/org/apache/cassandra/journal/RecordConsumer.java
@@ -19,10 +19,8 @@ package org.apache.cassandra.journal;
import java.nio.ByteBuffer;
-import org.agrona.collections.IntHashSet;
-
@FunctionalInterface
public interface RecordConsumer<K>
{
- void accept(long segment, int position, K key, ByteBuffer buffer,
IntHashSet hosts, int userVersion);
+ void accept(long segment, int position, K key, ByteBuffer buffer, int
userVersion);
}
diff --git a/src/java/org/apache/cassandra/journal/Segment.java
b/src/java/org/apache/cassandra/journal/Segment.java
index 2e441a13986..5faee3dc41b 100644
--- a/src/java/org/apache/cassandra/journal/Segment.java
+++ b/src/java/org/apache/cassandra/journal/Segment.java
@@ -92,7 +92,7 @@ public abstract class Segment<K, V> implements
SelfRefCounted<Segment<K, V>>, Co
if (read(offset, size, into))
{
Invariants.checkState(id.equals(into.key), "Index for %s read
incorrect key: expected %s but read %s", descriptor, id, into.key);
- consumer.accept(descriptor.timestamp, offset, id, into.value,
into.hosts, descriptor.userVersion);
+ consumer.accept(descriptor.timestamp, offset, id, into.value,
descriptor.userVersion);
return true;
}
return false;
@@ -118,7 +118,7 @@ public abstract class Segment<K, V> implements
SelfRefCounted<Segment<K, V>>, Co
Invariants.checkState(offset < prevOffset);
Invariants.checkState(read(offset, size, into), "Read should
always return true");
Invariants.checkState(id.equals(into.key), "Index for %s read
incorrect key: expected %s but read %s", descriptor, id, into.key);
- onEntry.accept(descriptor.timestamp, offset, into.key, into.value,
into.hosts, into.userVersion);
+ onEntry.accept(descriptor.timestamp, offset, into.key, into.value,
into.userVersion);
}
}
diff --git a/src/java/org/apache/cassandra/journal/StaticSegment.java
b/src/java/org/apache/cassandra/journal/StaticSegment.java
index 5f83a6eb1be..8c80d32bec1 100644
--- a/src/java/org/apache/cassandra/journal/StaticSegment.java
+++ b/src/java/org/apache/cassandra/journal/StaticSegment.java
@@ -27,7 +27,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-import org.agrona.collections.IntHashSet;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.Closeable;
@@ -256,7 +255,7 @@ public final class StaticSegment<K, V> extends Segment<K, V>
{
while (reader.advance())
{
- consumer.accept(descriptor.timestamp, reader.offset(),
reader.key(), reader.record(), reader.hosts(), descriptor.userVersion);
+ consumer.accept(descriptor.timestamp, reader.offset(),
reader.key(), reader.record(), descriptor.userVersion);
}
}
}
@@ -322,12 +321,6 @@ public final class StaticSegment<K, V> extends Segment<K,
V>
return holder.key;
}
- public IntHashSet hosts()
- {
- ensureHasAdvanced();
- return holder.hosts;
- }
-
public ByteBuffer record()
{
ensureHasAdvanced();
diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
index 5a0a6e7b5b0..b33f1bfe6a6 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
@@ -23,7 +23,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.NavigableMap;
-import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
@@ -100,8 +99,6 @@ public class AccordJournal implements accord.api.Journal,
RangeSearcher.Supplier
Invariants.checkState(MessagingService.current_version ==
MessagingService.VERSION_51, "Expected current version to be %d but given %d",
MessagingService.VERSION_51, MessagingService.current_version);
}
- private static final Set<Integer> SENTINEL_HOSTS =
Collections.singleton(0);
-
static final ThreadLocal<byte[]> keyCRCBytes = ThreadLocal.withInitial(()
-> new byte[JournalKeySupport.TOTAL_SIZE]);
private final Journal<JournalKey, Object> journal;
@@ -295,7 +292,7 @@ public class AccordJournal implements accord.api.Journal,
RangeSearcher.Supplier
}
JournalKey key = new JournalKey(update.txnId,
JournalKey.Type.COMMAND_DIFF, store);
- RecordPointer pointer = journal.asyncWrite(key, diff, SENTINEL_HOSTS);
+ RecordPointer pointer = journal.asyncWrite(key, diff);
if (journalTable.shouldIndex(key)
&& diff.hasParticipants()
&& diff.after.route() != null)
@@ -396,7 +393,7 @@ public class AccordJournal implements accord.api.Journal,
RangeSearcher.Supplier
private RecordPointer appendInternal(JournalKey key, Object write)
{
AccordJournalValueSerializers.FlyweightSerializer<Object, ?>
serializer = (AccordJournalValueSerializers.FlyweightSerializer<Object, ?>)
key.type.serializer;
- return journal.asyncWrite(key, (out, userVersion) ->
serializer.serialize(key, write, out, userVersion), SENTINEL_HOSTS);
+ return journal.asyncWrite(key, (out, userVersion) ->
serializer.serialize(key, write, out, userVersion));
}
@VisibleForTesting
@@ -454,12 +451,12 @@ public class AccordJournal implements accord.api.Journal,
RangeSearcher.Supplier
if (key.type != JournalKey.Type.COMMAND_DIFF)
{
// TODO (required): add "skip" for the key to avoid
getting stuck
- iter.readAllForKey(key, (segment, position, key1, buffer,
hosts, userVersion) -> {});
+ iter.readAllForKey(key, (segment, position, key1, buffer,
userVersion) -> {});
continue;
}
JournalKey finalKey = key;
- iter.readAllForKey(key, (segment, position, local, buffer,
hosts, userVersion) -> {
+ iter.readAllForKey(key, (segment, position, local, buffer,
userVersion) -> {
Invariants.checkState(finalKey.equals(local));
try (DataInputBuffer in = new DataInputBuffer(buffer,
false))
{
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java
b/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java
index 9981f5a9571..9f83eba75f4 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java
@@ -33,7 +33,6 @@ import org.slf4j.LoggerFactory;
import accord.primitives.Timestamp;
import accord.primitives.TxnId;
import accord.utils.Invariants;
-import org.agrona.collections.IntHashSet;
import org.agrona.collections.LongHashSet;
import org.apache.cassandra.cql3.ColumnIdentifier;
import org.apache.cassandra.cql3.Operator;
@@ -85,8 +84,6 @@ public class AccordJournalTable<K extends JournalKey, V>
implements RangeSearche
{
private static final Logger logger =
LoggerFactory.getLogger(AccordJournalTable.class);
- private static final IntHashSet SENTINEL_HOSTS = new IntHashSet();
-
private final Journal<K, V> journal;
private final ColumnFamilyStore cfs;
@@ -166,7 +163,7 @@ public class AccordJournalTable<K extends JournalKey, V>
implements RangeSearche
}
@Override
- public void accept(long segment, int position, K key, ByteBuffer
buffer, IntHashSet hosts, int userVersion)
+ public void accept(long segment, int position, K key, ByteBuffer
buffer, int userVersion)
{
readBuffer(buffer, reader, userVersion);
}
@@ -194,10 +191,10 @@ public class AccordJournalTable<K extends JournalKey, V>
implements RangeSearche
}
@Override
- public void accept(long segment, int position, K key, ByteBuffer
buffer, IntHashSet hosts, int userVersion)
+ public void accept(long segment, int position, K key, ByteBuffer
buffer, int userVersion)
{
visit(segment);
- super.accept(segment, position, key, buffer, hosts, userVersion);
+ super.accept(segment, position, key, buffer, userVersion);
}
}
@@ -219,10 +216,10 @@ public class AccordJournalTable<K extends JournalKey, V>
implements RangeSearche
}
@Override
- public void accept(long segment, int position, K key, ByteBuffer
buffer, IntHashSet hosts, int userVersion)
+ public void accept(long segment, int position, K key, ByteBuffer
buffer, int userVersion)
{
if (!tableRecordConsumer.visited(segment))
- super.accept(segment, position, key, buffer, hosts,
userVersion);
+ super.accept(segment, position, key, buffer, userVersion);
}
}
@@ -383,10 +380,9 @@ public class AccordJournalTable<K extends JournalKey, V>
implements RangeSearche
into.key = key;
into.value = row.getCell(recordColumn).buffer();
- into.hosts = SENTINEL_HOSTS;
into.userVersion =
Int32Type.instance.compose(row.getCell(versionColumn).buffer());
- onEntry.accept(descriptor, position, into.key, into.value, into.hosts,
into.userVersion);
+ onEntry.accept(descriptor, position, into.key, into.value,
into.userVersion);
}
@SuppressWarnings("resource") // Auto-closeable iterator will release
related resources
@@ -434,9 +430,9 @@ public class AccordJournalTable<K extends JournalKey, V>
implements RangeSearche
{
EntryHolder<K> into = new EntryHolder<>();
// TODO: use flyweight to avoid allocating extra lambdas?
- readRow(key, partition.next(), into, (segment, position, key1,
buffer, hosts, userVersion) -> {
+ readRow(key, partition.next(), into, (segment, position, key1,
buffer, userVersion) -> {
visit(segment);
- recordConsumer.accept(segment, position, key1, buffer,
hosts, userVersion);
+ recordConsumer.accept(segment, position, key1, buffer,
userVersion);
});
}
@@ -501,9 +497,9 @@ public class AccordJournalTable<K extends JournalKey, V>
implements RangeSearche
K tableKey = (K)tableIterator.key();
K journalKey = staticSegmentIterator.key();
if (journalKey != null && keySupport.compare(journalKey, key) == 0)
- staticSegmentIterator.readAllForKey(key, (segment, position,
key1, buffer, hosts, userVersion) -> {
+ staticSegmentIterator.readAllForKey(key, (segment, position,
key1, buffer, userVersion) -> {
if (!tableIterator.visited(segment))
- reader.accept(segment, position, key1, buffer, hosts,
userVersion);
+ reader.accept(segment, position, key1, buffer,
userVersion);
});
if (tableKey != null && keySupport.compare(tableKey, key) == 0)
diff --git
a/test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java
b/test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java
index a1235f7f03a..36040e115c1 100644
---
a/test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java
+++
b/test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.simulator.test;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.zip.Checksum;
@@ -96,7 +95,7 @@ public class AccordJournalSimulationTest extends
SimulationTestBase
{
int finalI = i;
State.executor.submit(() -> {
- RecordPointer ptr = State.journal.asyncWrite("test" +
finalI, "test" + finalI, Collections.singleton(1));
+ RecordPointer ptr = State.journal.asyncWrite("test" +
finalI, "test" + finalI);
State.journal.onDurable(ptr, State.latch::decrement);
});
}
diff --git a/test/unit/org/apache/cassandra/journal/JournalTest.java
b/test/unit/org/apache/cassandra/journal/JournalTest.java
index 7dd1c948a81..de6848b289a 100644
--- a/test/unit/org/apache/cassandra/journal/JournalTest.java
+++ b/test/unit/org/apache/cassandra/journal/JournalTest.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.journal;
import java.io.IOException;
import java.nio.file.Files;
-import java.util.Collections;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -50,8 +49,7 @@ public class JournalTest
directory.deleteRecursiveOnExit();
Journal<TimeUUID, Long> journal =
- new Journal<>("TestJournal", directory, TestParams.INSTANCE,
TimeUUIDKeySupport.INSTANCE, LongSerializer.INSTANCE, SegmentCompactor.noop());
-
+ new Journal<>("TestJournal", directory, TestParams.INSTANCE,
TimeUUIDKeySupport.INSTANCE, LongSerializer.INSTANCE, SegmentCompactor.noop());
journal.start();
@@ -60,10 +58,10 @@ public class JournalTest
TimeUUID id3 = nextTimeUUID();
TimeUUID id4 = nextTimeUUID();
- journal.blockingWrite(id1, 1L, Collections.singleton(1));
- journal.blockingWrite(id2, 2L, Collections.singleton(1));
- journal.blockingWrite(id3, 3L, Collections.singleton(1));
- journal.blockingWrite(id4, 4L, Collections.singleton(1));
+ journal.blockingWrite(id1, 1L);
+ journal.blockingWrite(id2, 2L);
+ journal.blockingWrite(id3, 3L);
+ journal.blockingWrite(id4, 4L);
assertEquals(1L, (long) journal.readLast(id1));
assertEquals(2L, (long) journal.readLast(id2));
diff --git a/test/unit/org/apache/cassandra/journal/MetadataTest.java
b/test/unit/org/apache/cassandra/journal/MetadataTest.java
index a10aaff82cb..2ad9c14edad 100644
--- a/test/unit/org/apache/cassandra/journal/MetadataTest.java
+++ b/test/unit/org/apache/cassandra/journal/MetadataTest.java
@@ -19,10 +19,6 @@ package org.apache.cassandra.journal;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Random;
-import java.util.Set;
import org.junit.Test;
@@ -36,47 +32,25 @@ public class MetadataTest
@Test
public void testUpdate()
{
- Random rng = new Random();
-
- int host1 = rng.nextInt();
- int host2 = host1 + 1;
- int host3 = host2 + 1;
- int host4 = host3 + 1;
- int host5 = host4 + 1;
-
Metadata metadata = Metadata.create();
- metadata.update(set(host1));
- metadata.update(set(host2, host3));
- metadata.update(set(host1, host4));
- metadata.update(set(host1, host2, host3, host4));
+ metadata.update();
+ metadata.update();
+ metadata.update();
+ metadata.update();
- assertEquals(set(host1, host2, host3, host4), metadata.hosts());
- assertEquals(3, metadata.count(host1));
- assertEquals(2, metadata.count(host2));
- assertEquals(2, metadata.count(host3));
- assertEquals(2, metadata.count(host4));
- assertEquals(0, metadata.count(host5));
assertEquals(4, metadata.totalCount());
}
@Test
public void testWriteRead() throws IOException
{
- Random rng = new Random();
-
- int host1 = rng.nextInt();
- int host2 = host1 + 1;
- int host3 = host2 + 1;
- int host4 = host3 + 1;
- int host5 = host4 + 1;
-
Metadata metadata = Metadata.create();
- metadata.update(set(host1));
- metadata.update(set(host2, host3));
- metadata.update(set(host1, host4));
- metadata.update(set(host1, host2, host3, host4));
+ metadata.update();
+ metadata.update();
+ metadata.update();
+ metadata.update();
try (DataOutputBuffer out = DataOutputBuffer.scratchBuffer.get())
{
@@ -86,20 +60,8 @@ public class MetadataTest
try (DataInputBuffer in = new DataInputBuffer(serialized, false))
{
Metadata deserialized = Metadata.read(in);
-
- assertEquals(set(host1, host2, host3, host4),
deserialized.hosts());
- assertEquals(3, deserialized.count(host1));
- assertEquals(2, deserialized.count(host2));
- assertEquals(2, deserialized.count(host3));
- assertEquals(2, deserialized.count(host4));
- assertEquals(0, deserialized.count(host5));
assertEquals(4, deserialized.totalCount());
}
}
}
-
- private static Set<Integer> set(Integer... ids)
- {
- return new HashSet<>(Arrays.asList(ids));
- }
}
diff --git a/test/unit/org/apache/cassandra/journal/SegmentTest.java
b/test/unit/org/apache/cassandra/journal/SegmentTest.java
index 8700c805fad..3c1e7fee8a0 100644
--- a/test/unit/org/apache/cassandra/journal/SegmentTest.java
+++ b/test/unit/org/apache/cassandra/journal/SegmentTest.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.journal;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
-import java.util.*;
import org.junit.Test;
@@ -51,18 +50,6 @@ public class SegmentTest
ByteBuffer record3 = ByteBufferUtil.bytes("sample record 3");
ByteBuffer record4 = ByteBufferUtil.bytes("sample record 4");
- Random rng = new Random();
-
- int host1 = rng.nextInt();
- int host2 = rng.nextInt();
- int host3 = rng.nextInt();
- int host4 = rng.nextInt();
-
- Set<Integer> hosts1 = set(host1);
- Set<Integer> hosts2 = set(host1, host2);
- Set<Integer> hosts3 = set(host1, host2, host3);
- Set<Integer> hosts4 = set(host4);
-
File directory = new File(Files.createTempDirectory(null));
directory.deleteRecursiveOnExit();
@@ -70,32 +57,28 @@ public class SegmentTest
ActiveSegment<TimeUUID, ByteBuffer> segment =
ActiveSegment.create(descriptor, params(), TimeUUIDKeySupport.INSTANCE);
- segment.allocate(record1.remaining(), hosts1).write(id1, record1,
hosts1);
- segment.allocate(record2.remaining(), hosts2).write(id2, record2,
hosts2);
- segment.allocate(record3.remaining(), hosts3).write(id3, record3,
hosts3);
- segment.allocate(record4.remaining(), hosts4).write(id4, record4,
hosts4);
+ segment.allocate(record1.remaining()).write(id1, record1);
+ segment.allocate(record2.remaining()).write(id2, record2);
+ segment.allocate(record3.remaining()).write(id3, record3);
+ segment.allocate(record4.remaining()).write(id4, record4);
// read all 4 entries by id and compare with originals
EntrySerializer.EntryHolder<TimeUUID> holder = new
EntrySerializer.EntryHolder<>();
segment.readLast(id1, holder);
assertEquals(id1, holder.key);
- assertEquals(hosts1, holder.hosts);
assertEquals(record1, holder.value);
segment.readLast(id2, holder);
assertEquals(id2, holder.key);
- assertEquals(hosts2, holder.hosts);
assertEquals(record2, holder.value);
segment.readLast(id3, holder);
assertEquals(id3, holder.key);
- assertEquals(hosts3, holder.hosts);
assertEquals(record3, holder.value);
segment.readLast(id4, holder);
assertEquals(id4, holder.key);
- assertEquals(hosts4, holder.hosts);
assertEquals(record4, holder.value);
}
@@ -114,18 +97,6 @@ public class SegmentTest
ByteBuffer record3 = ByteBufferUtil.bytes("sample record 3");
ByteBuffer record4 = ByteBufferUtil.bytes("sample record 4");
- Random rng = new Random();
-
- int host1 = rng.nextInt();
- int host2 = rng.nextInt();
- int host3 = rng.nextInt();
- int host4 = rng.nextInt();
-
- Set<Integer> hosts1 = set(host1);
- Set<Integer> hosts2 = set(host1, host2);
- Set<Integer> hosts3 = set(host1, host2, host3);
- Set<Integer> hosts4 = set(host4);
-
File directory = new File(Files.createTempDirectory(null));
directory.deleteRecursiveOnExit();
@@ -133,10 +104,10 @@ public class SegmentTest
ActiveSegment<TimeUUID, ByteBuffer> activeSegment =
ActiveSegment.create(descriptor, params(), TimeUUIDKeySupport.INSTANCE);
- activeSegment.allocate(record1.remaining(), hosts1).write(id1,
record1, hosts1);
- activeSegment.allocate(record2.remaining(), hosts2).write(id2,
record2, hosts2);
- activeSegment.allocate(record3.remaining(), hosts3).write(id3,
record3, hosts3);
- activeSegment.allocate(record4.remaining(), hosts4).write(id4,
record4, hosts4);
+ activeSegment.allocate(record1.remaining()).write(id1, record1);
+ activeSegment.allocate(record2.remaining()).write(id2, record2);
+ activeSegment.allocate(record3.remaining()).write(id3, record3);
+ activeSegment.allocate(record4.remaining()).write(id4, record4);
activeSegment.close(null);
@@ -147,22 +118,18 @@ public class SegmentTest
staticSegment.readLast(id1, holder);
assertEquals(id1, holder.key);
- assertEquals(hosts1, holder.hosts);
assertEquals(record1, holder.value);
staticSegment.readLast(id2, holder);
assertEquals(id2, holder.key);
- assertEquals(hosts2, holder.hosts);
assertEquals(record2, holder.value);
staticSegment.readLast(id3, holder);
assertEquals(id3, holder.key);
- assertEquals(hosts3, holder.hosts);
assertEquals(record3, holder.value);
staticSegment.readLast(id4, holder);
assertEquals(id4, holder.key);
- assertEquals(hosts4, holder.hosts);
assertEquals(record4, holder.value);
}
@@ -179,18 +146,6 @@ public class SegmentTest
ByteBuffer record3 = ByteBufferUtil.bytes("sample record 3");
ByteBuffer record4 = ByteBufferUtil.bytes("sample record 4");
- Random rng = new Random();
-
- int host1 = rng.nextInt();
- int host2 = rng.nextInt();
- int host3 = rng.nextInt();
- int host4 = rng.nextInt();
-
- Set<Integer> hosts1 = set(host1);
- Set<Integer> hosts2 = set(host1, host2);
- Set<Integer> hosts3 = set(host1, host2, host3);
- Set<Integer> hosts4 = set(host4);
-
File directory = new File(Files.createTempDirectory(null));
directory.deleteRecursiveOnExit();
@@ -198,10 +153,10 @@ public class SegmentTest
ActiveSegment<TimeUUID, ByteBuffer> activeSegment =
ActiveSegment.create(descriptor, params(), TimeUUIDKeySupport.INSTANCE);
- activeSegment.allocate(record1.remaining(), hosts1).write(id1,
record1, hosts1);
- activeSegment.allocate(record2.remaining(), hosts2).write(id2,
record2, hosts2);
- activeSegment.allocate(record3.remaining(), hosts3).write(id3,
record3, hosts3);
- activeSegment.allocate(record4.remaining(), hosts4).write(id4,
record4, hosts4);
+ activeSegment.allocate(record1.remaining()).write(id1, record1);
+ activeSegment.allocate(record2.remaining()).write(id2, record2);
+ activeSegment.allocate(record3.remaining()).write(id3, record3);
+ activeSegment.allocate(record4.remaining()).write(id4, record4);
Segment.Tidier tidier =
(Segment.Tidier)activeSegment.selfRef().tidier();
tidier.executor = ImmediateExecutor.INSTANCE;
@@ -215,32 +170,23 @@ public class SegmentTest
// read all 4 entries sequentially and compare with originals
assertTrue(reader.advance());
assertEquals(id1, reader.key());
- assertEquals(hosts1, reader.hosts());
assertEquals(record1, reader.record());
assertTrue(reader.advance());
assertEquals(id2, reader.key());
- assertEquals(hosts2, reader.hosts());
assertEquals(record2, reader.record());
assertTrue(reader.advance());
assertEquals(id3, reader.key());
- assertEquals(hosts3, reader.hosts());
assertEquals(record3, reader.record());
assertTrue(reader.advance());
assertEquals(id4, reader.key());
- assertEquals(hosts4, reader.hosts());
assertEquals(record4, reader.record());
assertFalse(reader.advance());
}
- private static Set<Integer> set(Integer... ids)
- {
- return new HashSet<>(Arrays.asList(ids));
- }
-
private static Params params()
{
return TestParams.INSTANCE;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]