This is an automated email from the ASF dual-hosted git repository.
iamaleksey pushed a commit to branch cep-45-mutation-tracking
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-45-mutation-tracking by
this push:
new e982cf69b4 Fix MutationId class hierarchy and related maps lookup
issues
e982cf69b4 is described below
commit e982cf69b4855dc863327979d5537e685dba21f1
Author: Aleksey Yeshchenko <[email protected]>
AuthorDate: Tue May 12 13:44:45 2026 +0100
Fix MutationId class hierarchy and related maps lookup issues
patch by Aleksey Yeschenko; reviewed by Ariel Weisberg for
CASSANDRA-21366
---
.../cassandra/replication/CoordinatorLogId.java | 34 +++-------
.../replication/MutableCoordinatorLogOffsets.java | 8 +--
.../apache/cassandra/replication/MutationId.java | 35 +++++------
.../replication/MutationTrackingService.java | 7 +--
.../cassandra/replication/OutgoingMutations.java | 2 +-
.../org/apache/cassandra/replication/Shard.java | 2 +-
.../cassandra/replication/ShortMutationId.java | 73 ++++++++++++++++++----
.../service/reads/tracked/TrackedLocalReads.java | 2 +-
8 files changed, 95 insertions(+), 68 deletions(-)
diff --git a/src/java/org/apache/cassandra/replication/CoordinatorLogId.java
b/src/java/org/apache/cassandra/replication/CoordinatorLogId.java
index bebb5a4024..7d0929b2ab 100644
--- a/src/java/org/apache/cassandra/replication/CoordinatorLogId.java
+++ b/src/java/org/apache/cassandra/replication/CoordinatorLogId.java
@@ -30,17 +30,21 @@ import org.apache.cassandra.io.util.DataOutputPlus;
public class CoordinatorLogId implements Serializable
{
- private static final CoordinatorLogId NONE = new
CoordinatorLogId(Integer.MIN_VALUE, Integer.MIN_VALUE);
+ static final int NONE_HOST_ID = Integer.MIN_VALUE;
+ static final int NONE_HOST_LOG_ID = Integer.MIN_VALUE;
+
+ static final long NONE_LOG_ID = asLong(NONE_HOST_ID, NONE_HOST_LOG_ID);
+ static final CoordinatorLogId NONE = new CoordinatorLogId(NONE_LOG_ID);
/** TCM host ID */
- protected final int hostId;
+ public final int hostId;
/**
* Host log ID (unique within the host).
* Allocated anew on host restart - one per token range replicated by the
host.
* Persisted on allocation, unique within the host.
*/
- protected final int hostLogId;
+ public final int hostLogId;
CoordinatorLogId(long id)
{
@@ -84,11 +88,6 @@ public class CoordinatorLogId implements Serializable
return (int) coordinatorLogId;
}
- public static CoordinatorLogId none()
- {
- return NONE;
- }
-
public static CoordinatorLogId fromLong(long logId)
{
return new CoordinatorLogId(logId);
@@ -96,12 +95,12 @@ public class CoordinatorLogId implements Serializable
static boolean isNone(int hostId, int hostLogId)
{
- return hostId == NONE.hostId && hostLogId == NONE.hostLogId;
+ return hostId == NONE_HOST_ID && hostLogId == NONE_HOST_LOG_ID;
}
public boolean isNone()
{
- return this == NONE || isNone(hostId, hostLogId);
+ return isNone(hostId, hostLogId);
}
@Override
@@ -135,20 +134,12 @@ public class CoordinatorLogId implements Serializable
out.writeInt(logId.hostLogId);
}
- public void serialize(long logId, DataOutputPlus out, int version)
throws IOException
- {
- out.writeInt(hostId(logId));
- out.writeInt(hostLogId(logId));
- }
-
@Override
public CoordinatorLogId deserialize(DataInputPlus in, int version)
throws IOException
{
int hostId = in.readInt();
int hostLogId = in.readInt();
- if (isNone(hostId, hostLogId))
- return none();
- return new CoordinatorLogId(hostId, hostLogId);
+ return isNone(hostId, hostLogId) ? NONE : new
CoordinatorLogId(hostId, hostLogId);
}
@Override
@@ -156,11 +147,6 @@ public class CoordinatorLogId implements Serializable
{
return TypeSizes.sizeof(logId.hostId) +
TypeSizes.sizeof(logId.hostLogId);
}
-
- public long serializedSize(long logId, int version)
- {
- return TypeSizes.sizeof(logId);
- }
}
public static final Serializer serializer = new Serializer();
diff --git
a/src/java/org/apache/cassandra/replication/MutableCoordinatorLogOffsets.java
b/src/java/org/apache/cassandra/replication/MutableCoordinatorLogOffsets.java
index 078ebf6316..b0d1d5ab1a 100644
---
a/src/java/org/apache/cassandra/replication/MutableCoordinatorLogOffsets.java
+++
b/src/java/org/apache/cassandra/replication/MutableCoordinatorLogOffsets.java
@@ -32,7 +32,7 @@ public interface MutableCoordinatorLogOffsets extends
CoordinatorLogOffsets<Offs
{
for (long logId : mutations)
{
- if (CoordinatorLogId.none().asLong() == logId)
+ if (CoordinatorLogId.NONE_LOG_ID == logId)
continue;
Offsets offsets = mutations.offsets(logId);
@@ -42,10 +42,8 @@ public interface MutableCoordinatorLogOffsets extends
CoordinatorLogOffsets<Offs
default void addAll(Offsets from)
{
- if (from.logId().isNone())
- return;
-
- from.forEach(this::add);
+ if (!from.logId().isNone())
+ from.forEach(this::add);
}
static MutableCoordinatorLogOffsets create(boolean assumeExclusive)
diff --git a/src/java/org/apache/cassandra/replication/MutationId.java
b/src/java/org/apache/cassandra/replication/MutationId.java
index 16c3ba42d8..c8f92e97cf 100644
--- a/src/java/org/apache/cassandra/replication/MutationId.java
+++ b/src/java/org/apache/cassandra/replication/MutationId.java
@@ -26,6 +26,8 @@ import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
+import static org.apache.cassandra.replication.CoordinatorLogId.NONE_LOG_ID;
+
/**
* Full mutation id, with the addition of timestamp component.
* <p>
@@ -34,11 +36,10 @@ import org.apache.cassandra.io.util.DataOutputPlus;
*/
public class MutationId extends ShortMutationId
{
- private static final long NONE_LOG_ID = CoordinatorLogId.none().asLong();
- private static final long NONE_SEQUENCE_ID = Long.MIN_VALUE;
- private static final int NONE_OFFSET = offset(NONE_SEQUENCE_ID);
- private static final int NONE_TIMESTAMP = timestamp(NONE_SEQUENCE_ID);
- private static final MutationId NONE = new MutationId(NONE_LOG_ID,
NONE_SEQUENCE_ID);
+ static final int NONE_TIMESTAMP = 0;
+ static final long NONE_SEQUENCE_ID = sequenceId(NONE_OFFSET,
NONE_TIMESTAMP);
+
+ static final MutationId NONE = new MutationId(NONE_LOG_ID,
NONE_SEQUENCE_ID);
/**
* 4 byte timestamp. The timestamp is monotonically non-decreasing.
@@ -71,7 +72,7 @@ public class MutationId extends ShortMutationId
public static long sequenceId(int offset, int timestamp)
{
- return ((long) offset << 32) | timestamp;
+ return ((long) offset << 32) | (timestamp & 0xffffffffL);
}
public static int offset(long sequenceId)
@@ -95,17 +96,16 @@ public class MutationId extends ShortMutationId
return NONE;
}
+ @Override
public boolean isNone()
{
- if (this == NONE)
- return true;
- return logId() == NONE_LOG_ID && offset() == NONE_OFFSET &&
timestamp() == NONE_TIMESTAMP;
+ return logId() == NONE_LOG_ID && sequenceId() == NONE_SEQUENCE_ID;
}
@Override
public String toString()
{
- return "MutationId{" + hostId() + ", " + hostLogId() + ", " + offset()
+ ", " + timestamp() + '}';
+ return "MutationId{" + hostId + ", " + hostLogId + ", " + offset + ",
" + timestamp + '}';
}
/**
@@ -131,10 +131,9 @@ public class MutationId extends ShortMutationId
long logId = buffer.getLong(pos);
long sequenceId = buffer.getLong(pos + 8);
- if (logId == MutationId.none().logId() && sequenceId ==
MutationId.none().sequenceId())
- return MutationId.none();
-
- return new MutationId(logId, sequenceId);
+ return logId == NONE_LOG_ID && sequenceId == NONE_SEQUENCE_ID
+ ? NONE
+ : new MutationId(logId, sequenceId);
}
public static class Serializer implements IVersionedSerializer<MutationId>
@@ -151,9 +150,9 @@ public class MutationId extends ShortMutationId
{
long logId = in.readLong();
long sequenceId = in.readLong();
- if (logId == NONE_LOG_ID && sequenceId == NONE_SEQUENCE_ID)
- return none();
- return new MutationId(logId, sequenceId);
+ return logId == NONE_LOG_ID && sequenceId == NONE_SEQUENCE_ID
+ ? NONE
+ : new MutationId(logId, sequenceId);
}
@Override
@@ -167,7 +166,7 @@ public class MutationId extends ShortMutationId
in.readLong();
in.readLong();
}
- };
+ }
public static Serializer serializer = new Serializer();
}
diff --git
a/src/java/org/apache/cassandra/replication/MutationTrackingService.java
b/src/java/org/apache/cassandra/replication/MutationTrackingService.java
index 5d4aec80ca..c901a8c425 100644
--- a/src/java/org/apache/cassandra/replication/MutationTrackingService.java
+++ b/src/java/org/apache/cassandra/replication/MutationTrackingService.java
@@ -338,7 +338,7 @@ public class MutationTrackingService
try
{
Preconditions.checkArgument(!mutationId.isNone());
- Shard shard = getShardNullable(mutationId);
+ Shard shard = getShardNullable(mutationId.asLogId());
// A response to the coordinator (for a forwarded write) won't
have the coordinator log matching it
if (shard != null)
shard.receivedWriteResponse(mutationId, fromHost);
@@ -358,10 +358,7 @@ public class MutationTrackingService
logger.debug("{} receivedActivationAck from {}",
transfer.logPrefix(), fromHost);
Preconditions.checkArgument(!transfer.id().isNone());
- // REVIEW: This will be called with ShortMutationId, which
overrides hashCode from CoordinatorLogId, but map
- // is updated with CoordinatorLogId; shouldn't call this with a
ShortMutationId, not sure why that's working
- // elsewhere
- Shard shard = getShardNullable(new
CoordinatorLogId(transfer.id().logId()));
+ Shard shard = getShardNullable(transfer.id().asLogId());
// Local activation acknowledged in
MutationTrackingService.activateLocal
if (shard != null &&
!fromHost.equals(FBUtilities.getBroadcastAddressAndPort()))
shard.receivedActivationResponse(transfer, fromHost);
diff --git a/src/java/org/apache/cassandra/replication/OutgoingMutations.java
b/src/java/org/apache/cassandra/replication/OutgoingMutations.java
index 2733c5b8ca..9c470757e5 100644
--- a/src/java/org/apache/cassandra/replication/OutgoingMutations.java
+++ b/src/java/org/apache/cassandra/replication/OutgoingMutations.java
@@ -107,7 +107,7 @@ class OutgoingMutations
private long key(int hostId, int offset)
{
- return ((long) hostId << 32) | offset;
+ return ((long) hostId << 32) | (offset & 0xffffffffL);
}
private enum OutgoingStatus
diff --git a/src/java/org/apache/cassandra/replication/Shard.java
b/src/java/org/apache/cassandra/replication/Shard.java
index 9fd098d82f..265cc8b8f3 100644
--- a/src/java/org/apache/cassandra/replication/Shard.java
+++ b/src/java/org/apache/cassandra/replication/Shard.java
@@ -309,7 +309,7 @@ public class Shard
return getOrCreate(mutation.id());
}
- private CoordinatorLog getOrCreate(MutationId mutationId)
+ private CoordinatorLog getOrCreate(ShortMutationId mutationId)
{
Preconditions.checkArgument(!mutationId.isNone());
return getOrCreate(mutationId.logId());
diff --git a/src/java/org/apache/cassandra/replication/ShortMutationId.java
b/src/java/org/apache/cassandra/replication/ShortMutationId.java
index 93ea294cbe..9328764772 100644
--- a/src/java/org/apache/cassandra/replication/ShortMutationId.java
+++ b/src/java/org/apache/cassandra/replication/ShortMutationId.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.replication;
import java.io.IOException;
+import java.io.Serializable;
import java.util.Comparator;
import org.apache.cassandra.db.TypeSizes;
@@ -25,39 +26,67 @@ import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
+import static org.apache.cassandra.replication.CoordinatorLogId.NONE_HOST_ID;
+import static
org.apache.cassandra.replication.CoordinatorLogId.NONE_HOST_LOG_ID;
+import static org.apache.cassandra.replication.CoordinatorLogId.NONE_LOG_ID;
+
/**
* MutationId without the timestamp component. This is sufficient for uniquely
identifying a mutation,
* and for lookup in the journal and most tracking data structures.
*/
-public class ShortMutationId extends CoordinatorLogId
+public class ShortMutationId implements Serializable
{
+ static final int NONE_OFFSET = Integer.MIN_VALUE;
+
+ static final ShortMutationId NONE = new ShortMutationId(NONE_LOG_ID,
NONE_OFFSET);
+
+ /** TCM host ID */
+ public final int hostId;
+
+ /**
+ * Host log ID (unique within the host).
+ * Allocated anew on host restart - one per token range replicated by the
host.
+ * Persisted on allocation, unique within the host.
+ */
+ public final int hostLogId;
+
/**
* 4 byte offset. Offest is incremented, is alone is sufficient to identify
* the entry within a coordinator log.
* MutationId adds a timestamp for correlation purposes.
*/
- protected final int offset;
+ public final int offset;
public ShortMutationId(long logId, int offset)
{
- super(logId);
- this.offset = offset;
+ this(CoordinatorLogId.hostId(logId),
CoordinatorLogId.hostLogId(logId), offset);
}
public ShortMutationId(CoordinatorLogId logId, int offset)
{
- super(logId.hostId(), logId.hostLogId());
+ this(logId.hostId, logId.hostLogId, offset);
+ }
+
+ private ShortMutationId(int hostId, int hostLogId, int offset)
+ {
+ this.hostId = hostId;
+ this.hostLogId = hostLogId;
this.offset = offset;
}
public ShortMutationId(MutationId mutationId)
{
- this(mutationId.logId(), mutationId.offset());
+ this(mutationId.hostLogId(), mutationId.hostId(), mutationId.offset());
}
- public long logId()
+ public int hostId()
+ {
+ return hostId;
+ }
+
+ public int hostLogId()
{
- return super.asLong();
+ return hostLogId;
}
public int offset()
@@ -65,8 +94,23 @@ public class ShortMutationId extends CoordinatorLogId
return offset;
}
+ public long logId()
+ {
+ return CoordinatorLogId.asLong(hostId, hostLogId);
+ }
+
+ public CoordinatorLogId asLogId()
+ {
+ return new CoordinatorLogId(hostId, hostLogId);
+ }
+
+ public boolean isNone()
+ {
+ return hostId == NONE_HOST_ID && hostLogId == NONE_HOST_LOG_ID &&
offset == NONE_OFFSET;
+ }
+
@Override
- public boolean equals(Object o)
+ public final boolean equals(Object o)
{
if (this == o) return true;
if (!(o instanceof ShortMutationId)) return false;
@@ -75,9 +119,9 @@ public class ShortMutationId extends CoordinatorLogId
}
@Override
- public int hashCode()
+ public final int hashCode()
{
- return Integer.hashCode(offset) + 31 * super.hashCode();
+ return Integer.hashCode(offset) + 31 * Long.hashCode(logId());
}
@Override
@@ -87,7 +131,7 @@ public class ShortMutationId extends CoordinatorLogId
}
public static final Comparator<ShortMutationId> comparator = (l, r) -> {
- int cmp = CoordinatorLogId.comparator.compare(l, r);
+ int cmp = Long.compareUnsigned(l.logId(), r.logId());
return cmp != 0 ? cmp : Integer.compare(l.offset, r.offset);
};
@@ -105,7 +149,10 @@ public class ShortMutationId extends CoordinatorLogId
{
long logId = in.readLong();
int offset = in.readInt();
- return new ShortMutationId(logId, offset);
+
+ return (logId == NONE_LOG_ID && offset == NONE_OFFSET)
+ ? NONE
+ : new ShortMutationId(logId, offset);
}
@Override
diff --git
a/src/java/org/apache/cassandra/service/reads/tracked/TrackedLocalReads.java
b/src/java/org/apache/cassandra/service/reads/tracked/TrackedLocalReads.java
index 83daa5826a..8edfe44804 100644
--- a/src/java/org/apache/cassandra/service/reads/tracked/TrackedLocalReads.java
+++ b/src/java/org/apache/cassandra/service/reads/tracked/TrackedLocalReads.java
@@ -179,7 +179,7 @@ public class TrackedLocalReads implements
ExpiredStatePurger.Expireable
while (transferIds.hasNext())
{
ShortMutationId id = transferIds.next();
- builder.builderForLog(id).unreconciled.add(id.offset());
+ builder.builderForLog(id.asLogId()).unreconciled.add(id.offset());
}
return builder.build();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]