This is an automated email from the ASF dual-hosted git repository.

iamaleksey pushed a commit to branch cep-45-mutation-tracking
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-45-mutation-tracking by 
this push:
     new e982cf69b4 Fix MutationId class hierarchy and related maps lookup 
issues
e982cf69b4 is described below

commit e982cf69b4855dc863327979d5537e685dba21f1
Author: Aleksey Yeshchenko <[email protected]>
AuthorDate: Tue May 12 13:44:45 2026 +0100

    Fix MutationId class hierarchy and related maps lookup issues
    
    patch by Aleksey Yeschenko; reviewed by Ariel Weisberg for
    CASSANDRA-21366
---
 .../cassandra/replication/CoordinatorLogId.java    | 34 +++-------
 .../replication/MutableCoordinatorLogOffsets.java  |  8 +--
 .../apache/cassandra/replication/MutationId.java   | 35 +++++------
 .../replication/MutationTrackingService.java       |  7 +--
 .../cassandra/replication/OutgoingMutations.java   |  2 +-
 .../org/apache/cassandra/replication/Shard.java    |  2 +-
 .../cassandra/replication/ShortMutationId.java     | 73 ++++++++++++++++++----
 .../service/reads/tracked/TrackedLocalReads.java   |  2 +-
 8 files changed, 95 insertions(+), 68 deletions(-)

diff --git a/src/java/org/apache/cassandra/replication/CoordinatorLogId.java 
b/src/java/org/apache/cassandra/replication/CoordinatorLogId.java
index bebb5a4024..7d0929b2ab 100644
--- a/src/java/org/apache/cassandra/replication/CoordinatorLogId.java
+++ b/src/java/org/apache/cassandra/replication/CoordinatorLogId.java
@@ -30,17 +30,21 @@ import org.apache.cassandra.io.util.DataOutputPlus;
 
 public class CoordinatorLogId implements Serializable
 {
-    private static final CoordinatorLogId NONE = new 
CoordinatorLogId(Integer.MIN_VALUE, Integer.MIN_VALUE);
+    static final int NONE_HOST_ID = Integer.MIN_VALUE;
+    static final int NONE_HOST_LOG_ID = Integer.MIN_VALUE;
+
+    static final long NONE_LOG_ID = asLong(NONE_HOST_ID, NONE_HOST_LOG_ID);
+    static final CoordinatorLogId NONE = new CoordinatorLogId(NONE_LOG_ID);
 
     /** TCM host ID */
-    protected final int hostId;
+    public final int hostId;
 
     /**
      * Host log ID (unique within the host).
      * Allocated anew on host restart - one per token range replicated by the 
host.
      * Persisted on allocation, unique within the host.
      */
-    protected final int hostLogId;
+    public final int hostLogId;
 
     CoordinatorLogId(long id)
     {
@@ -84,11 +88,6 @@ public class CoordinatorLogId implements Serializable
         return (int) coordinatorLogId;
     }
 
-    public static CoordinatorLogId none()
-    {
-        return NONE;
-    }
-
     public static CoordinatorLogId fromLong(long logId)
     {
         return new CoordinatorLogId(logId);
@@ -96,12 +95,12 @@ public class CoordinatorLogId implements Serializable
 
     static boolean isNone(int hostId, int hostLogId)
     {
-        return hostId == NONE.hostId && hostLogId == NONE.hostLogId;
+        return hostId == NONE_HOST_ID && hostLogId == NONE_HOST_LOG_ID;
     }
 
     public boolean isNone()
     {
-        return this == NONE || isNone(hostId, hostLogId);
+        return isNone(hostId, hostLogId);
     }
 
     @Override
@@ -135,20 +134,12 @@ public class CoordinatorLogId implements Serializable
             out.writeInt(logId.hostLogId);
         }
 
-        public void serialize(long logId, DataOutputPlus out, int version) 
throws IOException
-        {
-            out.writeInt(hostId(logId));
-            out.writeInt(hostLogId(logId));
-        }
-
         @Override
         public CoordinatorLogId deserialize(DataInputPlus in, int version) 
throws IOException
         {
             int hostId = in.readInt();
             int hostLogId = in.readInt();
-            if (isNone(hostId, hostLogId))
-                return none();
-            return new CoordinatorLogId(hostId, hostLogId);
+            return isNone(hostId, hostLogId) ? NONE : new 
CoordinatorLogId(hostId, hostLogId);
         }
 
         @Override
@@ -156,11 +147,6 @@ public class CoordinatorLogId implements Serializable
         {
             return TypeSizes.sizeof(logId.hostId) + 
TypeSizes.sizeof(logId.hostLogId);
         }
-
-        public long serializedSize(long logId, int version)
-        {
-            return TypeSizes.sizeof(logId);
-        }
     }
 
     public static final Serializer serializer = new Serializer();
diff --git 
a/src/java/org/apache/cassandra/replication/MutableCoordinatorLogOffsets.java 
b/src/java/org/apache/cassandra/replication/MutableCoordinatorLogOffsets.java
index 078ebf6316..b0d1d5ab1a 100644
--- 
a/src/java/org/apache/cassandra/replication/MutableCoordinatorLogOffsets.java
+++ 
b/src/java/org/apache/cassandra/replication/MutableCoordinatorLogOffsets.java
@@ -32,7 +32,7 @@ public interface MutableCoordinatorLogOffsets extends 
CoordinatorLogOffsets<Offs
     {
         for (long logId : mutations)
         {
-            if (CoordinatorLogId.none().asLong() == logId)
+            if (CoordinatorLogId.NONE_LOG_ID == logId)
                 continue;
 
             Offsets offsets = mutations.offsets(logId);
@@ -42,10 +42,8 @@ public interface MutableCoordinatorLogOffsets extends 
CoordinatorLogOffsets<Offs
 
     default void addAll(Offsets from)
     {
-        if (from.logId().isNone())
-            return;
-
-        from.forEach(this::add);
+        if (!from.logId().isNone())
+            from.forEach(this::add);
     }
 
     static MutableCoordinatorLogOffsets create(boolean assumeExclusive)
diff --git a/src/java/org/apache/cassandra/replication/MutationId.java 
b/src/java/org/apache/cassandra/replication/MutationId.java
index 16c3ba42d8..c8f92e97cf 100644
--- a/src/java/org/apache/cassandra/replication/MutationId.java
+++ b/src/java/org/apache/cassandra/replication/MutationId.java
@@ -26,6 +26,8 @@ import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 
+import static org.apache.cassandra.replication.CoordinatorLogId.NONE_LOG_ID;
+
 /**
  * Full mutation id, with the addition of timestamp component.
  * <p>
@@ -34,11 +36,10 @@ import org.apache.cassandra.io.util.DataOutputPlus;
  */
 public class MutationId extends ShortMutationId
 {
-    private static final long NONE_LOG_ID = CoordinatorLogId.none().asLong();
-    private static final long NONE_SEQUENCE_ID = Long.MIN_VALUE;
-    private static final int NONE_OFFSET = offset(NONE_SEQUENCE_ID);
-    private static final int NONE_TIMESTAMP = timestamp(NONE_SEQUENCE_ID);
-    private static final MutationId NONE = new MutationId(NONE_LOG_ID, 
NONE_SEQUENCE_ID);
+    static final int NONE_TIMESTAMP = 0;
+    static final long NONE_SEQUENCE_ID = sequenceId(NONE_OFFSET, 
NONE_TIMESTAMP);
+
+    static final MutationId NONE = new MutationId(NONE_LOG_ID, 
NONE_SEQUENCE_ID);
 
     /**
      * 4 byte timestamp. The timestamp is monotonically non-decreasing.
@@ -71,7 +72,7 @@ public class MutationId extends ShortMutationId
 
     public static long sequenceId(int offset, int timestamp)
     {
-        return ((long) offset << 32) | timestamp;
+        return ((long) offset << 32) | (timestamp & 0xffffffffL);
     }
 
     public static int offset(long sequenceId)
@@ -95,17 +96,16 @@ public class MutationId extends ShortMutationId
         return NONE;
     }
 
+    @Override
     public boolean isNone()
     {
-        if (this == NONE)
-            return true;
-        return logId() == NONE_LOG_ID && offset() == NONE_OFFSET && 
timestamp() == NONE_TIMESTAMP;
+        return logId() == NONE_LOG_ID && sequenceId() == NONE_SEQUENCE_ID;
     }
 
     @Override
     public String toString()
     {
-        return "MutationId{" + hostId() + ", " + hostLogId() + ", " + offset() 
+ ", " + timestamp() + '}';
+        return "MutationId{" + hostId + ", " + hostLogId + ", " + offset + ", 
" + timestamp + '}';
     }
 
     /**
@@ -131,10 +131,9 @@ public class MutationId extends ShortMutationId
         long logId = buffer.getLong(pos);
         long sequenceId = buffer.getLong(pos + 8);
 
-        if (logId == MutationId.none().logId() && sequenceId == 
MutationId.none().sequenceId())
-            return MutationId.none();
-
-        return new MutationId(logId, sequenceId);
+        return logId == NONE_LOG_ID && sequenceId == NONE_SEQUENCE_ID
+             ? NONE
+             : new MutationId(logId, sequenceId);
     }
 
     public static class Serializer implements IVersionedSerializer<MutationId>
@@ -151,9 +150,9 @@ public class MutationId extends ShortMutationId
         {
             long logId = in.readLong();
             long sequenceId = in.readLong();
-            if (logId == NONE_LOG_ID && sequenceId == NONE_SEQUENCE_ID)
-                return none();
-            return new MutationId(logId, sequenceId);
+            return logId == NONE_LOG_ID && sequenceId == NONE_SEQUENCE_ID
+                 ? NONE
+                 : new MutationId(logId, sequenceId);
         }
 
         @Override
@@ -167,7 +166,7 @@ public class MutationId extends ShortMutationId
             in.readLong();
             in.readLong();
         }
-    };
+    }
 
     public static Serializer serializer = new Serializer();
 }
diff --git 
a/src/java/org/apache/cassandra/replication/MutationTrackingService.java 
b/src/java/org/apache/cassandra/replication/MutationTrackingService.java
index 5d4aec80ca..c901a8c425 100644
--- a/src/java/org/apache/cassandra/replication/MutationTrackingService.java
+++ b/src/java/org/apache/cassandra/replication/MutationTrackingService.java
@@ -338,7 +338,7 @@ public class MutationTrackingService
         try
         {
             Preconditions.checkArgument(!mutationId.isNone());
-            Shard shard = getShardNullable(mutationId);
+            Shard shard = getShardNullable(mutationId.asLogId());
             // A response to the coordinator (for a forwarded write) won't 
have the coordinator log matching it
             if (shard != null)
                 shard.receivedWriteResponse(mutationId, fromHost);
@@ -358,10 +358,7 @@ public class MutationTrackingService
             logger.debug("{} receivedActivationAck from {}", 
transfer.logPrefix(), fromHost);
             Preconditions.checkArgument(!transfer.id().isNone());
 
-            // REVIEW: This will be called with ShortMutationId, which 
overrides hashCode from CoordinatorLogId, but map
-            // is updated with CoordinatorLogId; shouldn't call this with a 
ShortMutationId, not sure why that's working
-            // elsewhere
-            Shard shard = getShardNullable(new 
CoordinatorLogId(transfer.id().logId()));
+            Shard shard = getShardNullable(transfer.id().asLogId());
             // Local activation acknowledged in 
MutationTrackingService.activateLocal
             if (shard != null && 
!fromHost.equals(FBUtilities.getBroadcastAddressAndPort()))
                 shard.receivedActivationResponse(transfer, fromHost);
diff --git a/src/java/org/apache/cassandra/replication/OutgoingMutations.java 
b/src/java/org/apache/cassandra/replication/OutgoingMutations.java
index 2733c5b8ca..9c470757e5 100644
--- a/src/java/org/apache/cassandra/replication/OutgoingMutations.java
+++ b/src/java/org/apache/cassandra/replication/OutgoingMutations.java
@@ -107,7 +107,7 @@ class OutgoingMutations
 
         private long key(int hostId, int offset)
         {
-            return ((long) hostId << 32) | offset;
+            return ((long) hostId << 32) | (offset & 0xffffffffL);
         }
 
         private enum OutgoingStatus
diff --git a/src/java/org/apache/cassandra/replication/Shard.java 
b/src/java/org/apache/cassandra/replication/Shard.java
index 9fd098d82f..265cc8b8f3 100644
--- a/src/java/org/apache/cassandra/replication/Shard.java
+++ b/src/java/org/apache/cassandra/replication/Shard.java
@@ -309,7 +309,7 @@ public class Shard
         return getOrCreate(mutation.id());
     }
 
-    private CoordinatorLog getOrCreate(MutationId mutationId)
+    private CoordinatorLog getOrCreate(ShortMutationId mutationId)
     {
         Preconditions.checkArgument(!mutationId.isNone());
         return getOrCreate(mutationId.logId());
diff --git a/src/java/org/apache/cassandra/replication/ShortMutationId.java 
b/src/java/org/apache/cassandra/replication/ShortMutationId.java
index 93ea294cbe..9328764772 100644
--- a/src/java/org/apache/cassandra/replication/ShortMutationId.java
+++ b/src/java/org/apache/cassandra/replication/ShortMutationId.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.replication;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.Comparator;
 
 import org.apache.cassandra.db.TypeSizes;
@@ -25,39 +26,67 @@ import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 
+import static org.apache.cassandra.replication.CoordinatorLogId.NONE_HOST_ID;
+import static 
org.apache.cassandra.replication.CoordinatorLogId.NONE_HOST_LOG_ID;
+import static org.apache.cassandra.replication.CoordinatorLogId.NONE_LOG_ID;
+
 /**
  * MutationId without the timestamp component. This is sufficient for uniquely 
identifying a mutation,
  * and for lookup in the journal and most tracking data structures.
  */
-public class ShortMutationId extends CoordinatorLogId
+public class ShortMutationId implements Serializable
 {
+    static final int NONE_OFFSET = Integer.MIN_VALUE;
+
+    static final ShortMutationId NONE = new ShortMutationId(NONE_LOG_ID, 
NONE_OFFSET);
+
+    /** TCM host ID */
+    public final int hostId;
+
+    /**
+     * Host log ID (unique within the host).
+     * Allocated anew on host restart - one per token range replicated by the 
host.
+     * Persisted on allocation, unique within the host.
+     */
+    public final int hostLogId;
+
     /**
      * 4 byte offset. Offest is incremented, is alone is sufficient to identify
      * the entry within a coordinator log.
      * MutationId adds a timestamp for correlation purposes.
      */
-    protected final int offset;
+    public final int offset;
 
     public ShortMutationId(long logId, int offset)
     {
-        super(logId);
-        this.offset = offset;
+        this(CoordinatorLogId.hostId(logId), 
CoordinatorLogId.hostLogId(logId), offset);
     }
 
     public ShortMutationId(CoordinatorLogId logId, int offset)
     {
-        super(logId.hostId(), logId.hostLogId());
+        this(logId.hostId, logId.hostLogId, offset);
+    }
+
+    private ShortMutationId(int hostId, int hostLogId, int offset)
+    {
+        this.hostId = hostId;
+        this.hostLogId = hostLogId;
         this.offset = offset;
     }
 
     public ShortMutationId(MutationId mutationId)
     {
-        this(mutationId.logId(), mutationId.offset());
+        this(mutationId.hostLogId(), mutationId.hostId(), mutationId.offset());
     }
 
-    public long logId()
+    public int hostId()
+    {
+        return hostId;
+    }
+
+    public int hostLogId()
     {
-        return super.asLong();
+        return hostLogId;
     }
 
     public int offset()
@@ -65,8 +94,23 @@ public class ShortMutationId extends CoordinatorLogId
         return offset;
     }
 
+    public long logId()
+    {
+        return CoordinatorLogId.asLong(hostId, hostLogId);
+    }
+
+    public CoordinatorLogId asLogId()
+    {
+        return new CoordinatorLogId(hostId, hostLogId);
+    }
+
+    public boolean isNone()
+    {
+        return hostId == NONE_HOST_ID && hostLogId == NONE_HOST_LOG_ID && 
offset == NONE_OFFSET;
+    }
+
     @Override
-    public boolean equals(Object o)
+    public final boolean equals(Object o)
     {
         if (this == o) return true;
         if (!(o instanceof ShortMutationId)) return false;
@@ -75,9 +119,9 @@ public class ShortMutationId extends CoordinatorLogId
     }
 
     @Override
-    public int hashCode()
+    public final int hashCode()
     {
-        return Integer.hashCode(offset) + 31 * super.hashCode();
+        return Integer.hashCode(offset) + 31 * Long.hashCode(logId());
     }
 
     @Override
@@ -87,7 +131,7 @@ public class ShortMutationId extends CoordinatorLogId
     }
 
     public static final Comparator<ShortMutationId> comparator = (l, r) -> {
-        int cmp = CoordinatorLogId.comparator.compare(l, r);
+        int cmp = Long.compareUnsigned(l.logId(), r.logId());
         return cmp != 0 ? cmp : Integer.compare(l.offset, r.offset);
     };
 
@@ -105,7 +149,10 @@ public class ShortMutationId extends CoordinatorLogId
         {
             long logId = in.readLong();
             int offset = in.readInt();
-            return new ShortMutationId(logId, offset);
+
+            return (logId == NONE_LOG_ID && offset == NONE_OFFSET)
+                 ? NONE
+                 : new ShortMutationId(logId, offset);
         }
 
         @Override
diff --git 
a/src/java/org/apache/cassandra/service/reads/tracked/TrackedLocalReads.java 
b/src/java/org/apache/cassandra/service/reads/tracked/TrackedLocalReads.java
index 83daa5826a..8edfe44804 100644
--- a/src/java/org/apache/cassandra/service/reads/tracked/TrackedLocalReads.java
+++ b/src/java/org/apache/cassandra/service/reads/tracked/TrackedLocalReads.java
@@ -179,7 +179,7 @@ public class TrackedLocalReads implements 
ExpiredStatePurger.Expireable
         while (transferIds.hasNext())
         {
             ShortMutationId id = transferIds.next();
-            builder.builderForLog(id).unreconciled.add(id.offset());
+            builder.builderForLog(id.asLogId()).unreconciled.add(id.offset());
         }
         return builder.build();
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to