This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a commit to branch cassandra-5.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-5.0 by this push:
     new 376fe2a9fe Streamline the serialized format for index status gossip 
messages
376fe2a9fe is described below

commit 376fe2a9fe3f13c7555c40cda6d3912d55ef63cc
Author: Caleb Rackliffe <[email protected]>
AuthorDate: Tue Nov 12 19:10:15 2024 -0600

    Streamline the serialized format for index status gossip messages
    
    patch by Caleb Rackliffe; reviewed by David Capwell for CASSANDRA-20058
---
 CHANGES.txt                                        |   1 +
 src/java/org/apache/cassandra/index/Index.java     |  36 +++-
 .../apache/cassandra/index/IndexStatusManager.java | 117 +++++++++--
 .../test/sai/IndexAvailabilityTest.java            | 221 ++++++++++++++-------
 .../cassandra/index/IndexStatusManagerTest.java    |  28 ++-
 5 files changed, 298 insertions(+), 105 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 003d870a25..bb5f13abce 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.0.3
+ * Streamline the serialized format for index status gossip messages 
(CASSANDRA-20058)
  * Batch clusterings into single SAI partition post-filtering reads 
(CASSANDRA-19497)
  * Ban the usage of "var" instead of full types in the production code 
(CASSANDRA-20038)
  * Suppress CVE-2024-45772 from lucene-core-9.7.0.jar (CASSANDRA-20024)
diff --git a/src/java/org/apache/cassandra/index/Index.java 
b/src/java/org/apache/cassandra/index/Index.java
index 8abc800e0f..f9e2265631 100644
--- a/src/java/org/apache/cassandra/index/Index.java
+++ b/src/java/org/apache/cassandra/index/Index.java
@@ -1031,10 +1031,36 @@ public interface Index
      */
     enum Status
     {
-        UNKNOWN,
-        FULL_REBUILD_STARTED,
-        BUILD_FAILED,
-        BUILD_SUCCEEDED,
-        DROPPED
+        UNKNOWN(0),
+        FULL_REBUILD_STARTED(1),
+        BUILD_FAILED(2),
+        BUILD_SUCCEEDED(3),
+        DROPPED(4);
+
+        public final int code;
+
+        Status(int code)
+        {
+            this.code = code;
+        }
+
+        static Status fromCode(int code)
+        {
+            switch (code)
+            {
+                case 0:
+                    return UNKNOWN;
+                case 1:
+                    return FULL_REBUILD_STARTED;
+                case 2:
+                    return BUILD_FAILED;
+                case 3:
+                    return BUILD_SUCCEEDED;
+                case 4:
+                    return DROPPED;
+            }
+
+            throw new IllegalArgumentException("Unrecognized code: " + code);
+        }
     }
 }
diff --git a/src/java/org/apache/cassandra/index/IndexStatusManager.java 
b/src/java/org/apache/cassandra/index/IndexStatusManager.java
index 1c0f5887db..6ba91beb7b 100644
--- a/src/java/org/apache/cassandra/index/IndexStatusManager.java
+++ b/src/java/org/apache/cassandra/index/IndexStatusManager.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -45,6 +46,7 @@ import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.CassandraVersion;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.JsonUtils;
 
@@ -73,8 +75,7 @@ public class IndexStatusManager
      */
     public final Map<InetAddressAndPort, Map<String, Index.Status>> 
peerIndexStatus = new HashMap<>();
 
-    private IndexStatusManager()
-    {}
+    private IndexStatusManager() {}
 
     /**
      * Remove endpoints whose indexes are not queryable for the specified 
{@link Index.QueryPlan}.
@@ -148,27 +149,55 @@ public class IndexStatusManager
             if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
                 return;
 
-            Map<String, String> peerStatus = 
JsonUtils.fromJsonMap(versionedValue.value);
-            Map<String, Index.Status> indexStatus = new HashMap<>();
+            Map<String, Index.Status> indexStatusMap = 
statusMapFromString(versionedValue);
 
-            for (Map.Entry<String, String> e : peerStatus.entrySet())
-            {
-                String keyspaceIndex = e.getKey();
-                Index.Status status = Index.Status.valueOf(e.getValue());
-                indexStatus.put(keyspaceIndex, status);
-            }
-
-            Map<String, Index.Status> oldStatus = 
peerIndexStatus.put(endpoint, indexStatus);
-            Map<String, Index.Status> updated = 
updatedIndexStatuses(oldStatus, indexStatus);
-            Set<String> removed = removedIndexStatuses(oldStatus, indexStatus);
+            Map<String, Index.Status> oldStatus = 
peerIndexStatus.put(endpoint, indexStatusMap);
+            Map<String, Index.Status> updated = 
updatedIndexStatuses(oldStatus, indexStatusMap);
+            Set<String> removed = removedIndexStatuses(oldStatus, 
indexStatusMap);
             if (!updated.isEmpty() || !removed.isEmpty())
                 logger.debug("Received index status for peer {}:\n    Updated: 
{}\n    Removed: {}",
                              endpoint, updated, removed);
         }
-        catch (MarshalException | IllegalArgumentException e)
+        catch (Exception e)
         {
-            logger.warn("Unable to parse index status: {}", e.getMessage());
+            logger.error("Unable to parse index status: {}", e.getMessage());
+        }
+    }
+
+    private Map<String, Index.Status> statusMapFromString(VersionedValue 
versionedValue)
+    {
+        Map<String, Object> peerStatus = 
JsonUtils.fromJsonMap(versionedValue.value);
+        Map<String, Index.Status> indexStatusMap = new HashMap<>();
+
+        for (Map.Entry<String, Object> endpointStatus : peerStatus.entrySet())
+        {
+            String keyspaceOrIndex = endpointStatus.getKey();
+            Object keyspaceOrIndexStatus = endpointStatus.getValue();
+
+            if (keyspaceOrIndexStatus instanceof String)
+            {
+                // This is the legacy format: (fully qualified index name -> 
enum string)
+                Index.Status status = 
Index.Status.valueOf(keyspaceOrIndexStatus.toString());
+                indexStatusMap.put(keyspaceOrIndex, status);
+            }
+            else if (keyspaceOrIndexStatus instanceof Map)
+            {
+                // This is the new format. (keyspace -> (index -> numeric enum 
code)) 
+                @SuppressWarnings("unchecked") 
+                Map<String, Integer> keyspaceIndexStatusMap = (Map<String, 
Integer>) keyspaceOrIndexStatus;
+
+                for (Map.Entry<String, Integer> indexStatus : 
keyspaceIndexStatusMap.entrySet())
+                {
+                    Index.Status status = 
Index.Status.fromCode(indexStatus.getValue());
+                    indexStatusMap.put(identifier(keyspaceOrIndex, 
indexStatus.getKey()), status);
+                }
+            }
+            else
+            {
+                throw new MarshalException("Invalid index status format: " + 
endpointStatus);
+            }
         }
+        return indexStatusMap;
     }
 
     /**
@@ -183,35 +212,79 @@ public class IndexStatusManager
     {
         try
         {
-            Map<String, Index.Status> states = 
peerIndexStatus.computeIfAbsent(FBUtilities.getBroadcastAddressAndPort(),
+            Map<String, Index.Status> statusMap = 
peerIndexStatus.computeIfAbsent(FBUtilities.getBroadcastAddressAndPort(),
                                                                                
k -> new HashMap<>());
             String keyspaceIndex = identifier(keyspace, index);
 
             if (status == Index.Status.DROPPED)
-                states.remove(keyspaceIndex);
+                statusMap.remove(keyspaceIndex);
             else
-                states.put(keyspaceIndex, status);
+                statusMap.put(keyspaceIndex, status);
 
             // Don't try and propagate if the gossiper isn't enabled. This is 
primarily for tests where the
             // Gossiper has not been started. If we attempt to propagate when 
not started an exception is
             // logged and this causes a number of dtests to fail.
             if (Gossiper.instance.isEnabled())
             {
-                String newStatus = 
JsonUtils.JSON_OBJECT_MAPPER.writeValueAsString(states);
+                // Versions 5.0.0 through 5.0.2 use a much more bloated format 
that duplicates keyspace names
+                // and writes full status names instead of their numeric 
codes. If the minimum cluster version is
+                // unknown or one of those 3 versions, continue to propagate 
the old format.
+                CassandraVersion minVersion = 
Gossiper.instance.getMinVersion(1, TimeUnit.SECONDS);
+                String newSerializedStatusMap = 
shouldWriteLegacyStatusFormat(minVersion) ? 
JsonUtils.writeAsJsonString(statusMap) 
+                                                                               
           : toSerializedFormat(statusMap);
+
                 statusPropagationExecutor.submit(() -> {
                     // schedule gossiper update asynchronously to avoid 
potential deadlock when another thread is holding
                     // gossiper taskLock.
-                    VersionedValue value = 
StorageService.instance.valueFactory.indexStatus(newStatus);
+                    VersionedValue value = 
StorageService.instance.valueFactory.indexStatus(newSerializedStatusMap);
                     
Gossiper.instance.addLocalApplicationState(ApplicationState.INDEX_STATUS, 
value);
                 });
             }
         }
-        catch (Throwable e)
+        catch (Exception e)
         {
             logger.warn("Unable to propagate index status: {}", 
e.getMessage());
         }
     }
 
+    private static boolean shouldWriteLegacyStatusFormat(CassandraVersion 
minVersion)
+    {
+        return minVersion == null || (minVersion.major == 5 && 
minVersion.minor == 0 && minVersion.patch < 3);
+    }
+
+    /**
+     * Serializes as a JSON string the status of the indexes in the provided 
map.
+     * <p> 
+     * For example, the map...
+     * <pre>
+     * {
+     *     ks1.cf1_idx1=FULL_REBUILD_STARTED,
+     *     ks1.cf1_idx2=FULL_REBUILD_STARTED,
+     *     system.PaxosUncommittedIndex=BUILD_SUCCEEDED
+     * }
+     * </pre>
+     * ...will be converted to the string...
+     * <pre>
+     * {
+     *     "system": {"PaxosUncommittedIndex": 3},
+     *     "ks1": {"cf1_idx1": 1, "cf1_idx2": 1}
+     * }
+     * </pre>
+     */
+    public static String toSerializedFormat(Map<String, Index.Status> 
indexStatusMap)
+    {
+        Map<String, Map<String, Integer>> serialized = new HashMap<>();
+
+        for (Map.Entry<String, Index.Status> e : indexStatusMap.entrySet())
+        {
+            String[] keyspaceAndIndex = e.getKey().split("\\.");
+            serialized.computeIfAbsent(keyspaceAndIndex[0], ignore -> new 
HashMap<>())
+                      .put(keyspaceAndIndex[1], e.getValue().code);
+        }
+
+        return JsonUtils.writeAsJsonString(serialized);
+    }
+
     @VisibleForTesting
     public synchronized Index.Status getIndexStatus(InetAddressAndPort peer, 
String keyspace, String index)
     {
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/sai/IndexAvailabilityTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/sai/IndexAvailabilityTest.java
index 3a9e111bad..5731281601 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/sai/IndexAvailabilityTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/sai/IndexAvailabilityTest.java
@@ -29,6 +29,9 @@ import java.util.concurrent.TimeUnit;
 import com.google.common.base.Objects;
 import org.junit.Test;
 
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
 import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
 import org.apache.cassandra.distributed.test.TestBaseImpl;
@@ -41,6 +44,7 @@ import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.utils.FBUtilities;
 
+import static net.bytebuddy.matcher.ElementMatchers.named;
 import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
 import static org.apache.cassandra.distributed.api.Feature.NETWORK;
 import static 
org.apache.cassandra.distributed.test.sai.SAIUtil.waitForIndexQueryable;
@@ -62,88 +66,117 @@ public class IndexAvailabilityTest extends TestBaseImpl
     public void verifyIndexStatusPropagation() throws Exception
     {
         try (Cluster cluster = init(Cluster.build(2)
-                                           .withConfig(config -> 
config.with(GOSSIP)
-                                                                       
.with(NETWORK))
+                                           .withConfig(config -> 
config.with(GOSSIP).with(NETWORK))
                                            .start()))
         {
-            String ks1 = "ks1";
-            String ks2 = "ks2";
-            String ks3 = "ks3";
-            String cf1 = "cf1";
-            String index1 = "cf1_idx1";
-            String index2 = "cf1_idx2";
-
-            keyspaces = Arrays.asList(ks1, ks2, ks3);
-            indexesPerKs = Arrays.asList(index1, index2);
-
-            // create 1 tables per keyspace, 2 indexes per table. all indexes 
are queryable
-            for (String ks : keyspaces)
-            {
-                cluster.schemaChange(String.format(CREATE_KEYSPACE, ks, 2));
-                cluster.schemaChange(String.format(CREATE_TABLE, ks, cf1));
-                cluster.schemaChange(String.format(CREATE_INDEX, index1, ks, 
cf1, "v1"));
-                cluster.schemaChange(String.format(CREATE_INDEX, index2, ks, 
cf1, "v2"));
-                waitForIndexQueryable(cluster, ks);
-                cluster.forEach(node -> {
-                    expectedNodeIndexQueryability.put(NodeIndex.create(ks, 
index1, node), Index.Status.BUILD_SUCCEEDED);
-                    expectedNodeIndexQueryability.put(NodeIndex.create(ks, 
index2, node), Index.Status.BUILD_SUCCEEDED);
-                });
-            }
+            verifyIndexStatusPropagation(cluster);
+        }
+    }
 
-            // mark ks1 index1 as non-queryable on node1
-            markIndexNonQueryable(cluster.get(1), ks1, cf1, index1);
-            // on node2, it observes that node1 ks1.index1 is not queryable
-            waitForIndexingStatus(cluster.get(2), ks1, index1, cluster.get(1), 
Index.Status.BUILD_FAILED);
-            // other indexes or keyspaces should not be affected
-            assertIndexingStatus(cluster);
-
-            // mark ks2 index2 as non-queryable on node2
-            markIndexNonQueryable(cluster.get(2), ks2, cf1, index2);
-            // on node1, it observes that node2 ks2.index2 is not queryable
-            waitForIndexingStatus(cluster.get(1), ks2, index2, cluster.get(2), 
Index.Status.BUILD_FAILED);
-            // other indexes or keyspaces should not be affected
-            assertIndexingStatus(cluster);
-
-            // mark ks1 index1 as queryable on node1
-            markIndexQueryable(cluster.get(1), ks1, cf1, index1);
-            // on node2, it observes that node1 ks1.index1 is queryable
-            waitForIndexingStatus(cluster.get(2), ks1, index1, cluster.get(1), 
Index.Status.BUILD_SUCCEEDED);
-            // other indexes or keyspaces should not be affected
-            assertIndexingStatus(cluster);
-
-            // mark ks2 index2 as indexing on node1
-            markIndexBuilding(cluster.get(1), ks2, cf1, index2);
-            // on node2, it observes that node1 ks2.index2 is not queryable
-            waitForIndexingStatus(cluster.get(2), ks2, index2, cluster.get(1), 
Index.Status.FULL_REBUILD_STARTED);
-            // other indexes or keyspaces should not be affected
-            assertIndexingStatus(cluster);
-
-            // drop ks1, ks1 index1/index2 should be non queryable on all nodes
-            cluster.schemaChange("DROP KEYSPACE " + ks1);
-            expectedNodeIndexQueryability.keySet().forEach(k -> {
-                if (k.keyspace.equals(ks1))
-                    expectedNodeIndexQueryability.put(k, Index.Status.UNKNOWN);
-            });
-            assertIndexingStatus(cluster);
+    @Test
+    public void verifyIndexStatusPropagationMixedPatchVersion() throws 
Exception
+    {
+        try (Cluster cluster = init(Cluster.build(2)
+                                           .withConfig(config -> 
config.with(GOSSIP).with(NETWORK))
+                                           
.withInstanceInitializer(MixedPatchVersionHelper::setVersions)
+                                           .start()))
+        {
+            verifyIndexStatusPropagation(cluster);
+        }
+    }
 
-            // drop ks2 index2, there should be no ks2 index2 status on all 
node
-            cluster.schemaChange("DROP INDEX " + ks2 + "." + index2);
-            expectedNodeIndexQueryability.keySet().forEach(k -> {
-                if (k.keyspace.equals(ks2) && k.index.equals(index2))
-                    expectedNodeIndexQueryability.put(k, Index.Status.UNKNOWN);
-            });
-            assertIndexingStatus(cluster);
+    @Test
+    public void verifyIndexStatusPropagationMixedMajorVersion() throws 
Exception
+    {
+        try (Cluster cluster = init(Cluster.build(2)
+                                           .withConfig(config -> 
config.with(GOSSIP).with(NETWORK))
+                                           
.withInstanceInitializer(MixedMajorVersionHelper::setVersions)
+                                           .start()))
+        {
+            verifyIndexStatusPropagation(cluster);
+        }
+    }
+
+    private void verifyIndexStatusPropagation(Cluster cluster)
+    {
+        String ks1 = "ks1";
+        String ks2 = "ks2";
+        String ks3 = "ks3";
+        String cf1 = "cf1";
+        String index1 = "cf1_idx1";
+        String index2 = "cf1_idx2";
+
+        keyspaces = Arrays.asList(ks1, ks2, ks3);
+        indexesPerKs = Arrays.asList(index1, index2);
 
-            // drop ks3 cf1, there should be no ks3 index1/index2 status
-            cluster.schemaChange("DROP TABLE " + ks3 + "." + cf1);
-            expectedNodeIndexQueryability.keySet().forEach(k -> {
-                if (k.keyspace.equals(ks3))
-                    expectedNodeIndexQueryability.put(k, Index.Status.UNKNOWN);
+        // create 1 tables per keyspace, 2 indexes per table. all indexes are 
queryable
+        for (String ks : keyspaces)
+        {
+            cluster.schemaChange(String.format(CREATE_KEYSPACE, ks, 2));
+            cluster.schemaChange(String.format(CREATE_TABLE, ks, cf1));
+            cluster.schemaChange(String.format(CREATE_INDEX, index1, ks, cf1, 
"v1"));
+            cluster.schemaChange(String.format(CREATE_INDEX, index2, ks, cf1, 
"v2"));
+            waitForIndexQueryable(cluster, ks);
+            cluster.forEach(node -> {
+                expectedNodeIndexQueryability.put(NodeIndex.create(ks, index1, 
node), Index.Status.BUILD_SUCCEEDED);
+                expectedNodeIndexQueryability.put(NodeIndex.create(ks, index2, 
node), Index.Status.BUILD_SUCCEEDED);
             });
-            assertIndexingStatus(cluster);
         }
+
+        // mark ks1 index1 as non-queryable on node1
+        markIndexNonQueryable(cluster.get(1), ks1, cf1, index1);
+        // on node2, it observes that node1 ks1.index1 is not queryable
+        waitForIndexingStatus(cluster.get(2), ks1, index1, cluster.get(1), 
Index.Status.BUILD_FAILED);
+        // other indexes or keyspaces should not be affected
+        assertIndexingStatus(cluster);
+
+        // mark ks2 index2 as non-queryable on node2
+        markIndexNonQueryable(cluster.get(2), ks2, cf1, index2);
+        // on node1, it observes that node2 ks2.index2 is not queryable
+        waitForIndexingStatus(cluster.get(1), ks2, index2, cluster.get(2), 
Index.Status.BUILD_FAILED);
+        // other indexes or keyspaces should not be affected
+        assertIndexingStatus(cluster);
+
+        // mark ks1 index1 as queryable on node1
+        markIndexQueryable(cluster.get(1), ks1, cf1, index1);
+        // on node2, it observes that node1 ks1.index1 is queryable
+        waitForIndexingStatus(cluster.get(2), ks1, index1, cluster.get(1), 
Index.Status.BUILD_SUCCEEDED);
+        // other indexes or keyspaces should not be affected
+        assertIndexingStatus(cluster);
+
+        // mark ks2 index2 as indexing on node1
+        markIndexBuilding(cluster.get(1), ks2, cf1, index2);
+        // on node2, it observes that node1 ks2.index2 is not queryable
+        waitForIndexingStatus(cluster.get(2), ks2, index2, cluster.get(1), 
Index.Status.FULL_REBUILD_STARTED);
+        // other indexes or keyspaces should not be affected
+        assertIndexingStatus(cluster);
+
+        // drop ks1, ks1 index1/index2 should be non queryable on all nodes
+        cluster.schemaChange("DROP KEYSPACE " + ks1);
+        expectedNodeIndexQueryability.keySet().forEach(k -> {
+            if (k.keyspace.equals(ks1))
+                expectedNodeIndexQueryability.put(k, Index.Status.UNKNOWN);
+        });
+        assertIndexingStatus(cluster);
+
+        // drop ks2 index2, there should be no ks2 index2 status on all node
+        cluster.schemaChange("DROP INDEX " + ks2 + '.' + index2);
+        expectedNodeIndexQueryability.keySet().forEach(k -> {
+            if (k.keyspace.equals(ks2) && k.index.equals(index2))
+                expectedNodeIndexQueryability.put(k, Index.Status.UNKNOWN);
+        });
+        assertIndexingStatus(cluster);
+
+        // drop ks3 cf1, there should be no ks3 index1/index2 status
+        cluster.schemaChange("DROP TABLE " + ks3 + '.' + cf1);
+        expectedNodeIndexQueryability.keySet().forEach(k -> {
+            if (k.keyspace.equals(ks3))
+                expectedNodeIndexQueryability.put(k, Index.Status.UNKNOWN);
+        });
+        assertIndexingStatus(cluster);
     }
 
+    @SuppressWarnings("DataFlowIssue")
     private void markIndexNonQueryable(IInvokableInstance node, String 
keyspace, String table, String indexName)
     {
         expectedNodeIndexQueryability.put(NodeIndex.create(keyspace, 
indexName, node), Index.Status.BUILD_FAILED);
@@ -155,6 +188,7 @@ public class IndexAvailabilityTest extends TestBaseImpl
         });
     }
 
+    @SuppressWarnings("DataFlowIssue")
     private void markIndexQueryable(IInvokableInstance node, String keyspace, 
String table, String indexName)
     {
         expectedNodeIndexQueryability.put(NodeIndex.create(keyspace, 
indexName, node), Index.Status.BUILD_SUCCEEDED);
@@ -166,6 +200,7 @@ public class IndexAvailabilityTest extends TestBaseImpl
         });
     }
 
+    @SuppressWarnings("DataFlowIssue")
     private void markIndexBuilding(IInvokableInstance node, String keyspace, 
String table, String indexName)
     {
         expectedNodeIndexQueryability.put(NodeIndex.create(keyspace, 
indexName, node), Index.Status.FULL_REBUILD_STARTED);
@@ -285,4 +320,44 @@ public class IndexAvailabilityTest extends TestBaseImpl
             return Objects.hashCode(keyspace, index, node);
         }
     }
+
+    public static class MixedMajorVersionHelper
+    {
+        @SuppressWarnings({ "unused", "resource" })
+        static void setVersions(ClassLoader loader, int node)
+        {
+            if (node == 1)
+                new ByteBuddy().rebase(FBUtilities.class)
+                               .method(named("getReleaseVersionString"))
+                               
.intercept(MethodDelegation.to(MixedMajorVersionHelper.class))
+                               .make()
+                               .load(loader, 
ClassLoadingStrategy.Default.INJECTION);
+        }
+
+        @SuppressWarnings("unused")
+        public static String getReleaseVersionString()
+        {
+            return "4.1.0";
+        }
+    }
+
+    public static class MixedPatchVersionHelper
+    {
+        @SuppressWarnings({ "unused", "resource" })
+        static void setVersions(ClassLoader loader, int node)
+        {
+            if (node == 1)
+                new ByteBuddy().rebase(FBUtilities.class)
+                               .method(named("getReleaseVersionString"))
+                               
.intercept(MethodDelegation.to(MixedPatchVersionHelper.class))
+                               .make()
+                               .load(loader, 
ClassLoadingStrategy.Default.INJECTION);
+        }
+
+        @SuppressWarnings("unused")
+        public static String getReleaseVersionString()
+        {
+            return "5.0.2";
+        }
+    }
 }
diff --git a/test/unit/org/apache/cassandra/index/IndexStatusManagerTest.java 
b/test/unit/org/apache/cassandra/index/IndexStatusManagerTest.java
index d08fec974a..331e079680 100644
--- a/test/unit/org/apache/cassandra/index/IndexStatusManagerTest.java
+++ b/test/unit/org/apache/cassandra/index/IndexStatusManagerTest.java
@@ -19,12 +19,16 @@
 package org.apache.cassandra.index;
 
 import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.stream.Collectors;
 
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import org.mockito.Mockito;
 
 import org.apache.cassandra.db.ConsistencyLevel;
@@ -47,8 +51,21 @@ import static org.apache.cassandra.locator.ReplicaUtils.full;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.Assert.assertArrayEquals;
 
+@RunWith(Parameterized.class)
 public class IndexStatusManagerTest
 {
+    @Parameterized.Parameter
+    public boolean legacyStatusFormat;
+
+    @Parameterized.Parameters(name = "{0}")
+    public static List<Object[]> parameters()
+    {
+        List<Object[]> parameters = new ArrayList<>();
+        parameters.add(new Object[] { true });
+        parameters.add(new Object[] { false });
+        return parameters;
+    }
+
     static class Testcase
     {
         String keyspace;
@@ -389,11 +406,12 @@ public class IndexStatusManagerTest
                 .collect(Collectors.toSet());
 
         // send indexStatus for each endpoint
-        testcase.indexStatus.forEach((endpoint, indexStatus) ->
-                IndexStatusManager.instance.receivePeerIndexStatus(
-                        endpoint,
-                        
VersionedValue.unsafeMakeVersionedValue(JsonUtils.writeAsJsonString(indexStatus),
 1)
-                ));
+        testcase.indexStatus.forEach((endpoint, indexStatusMap) ->
+        {
+            String serialized = legacyStatusFormat ? 
JsonUtils.writeAsJsonString(indexStatusMap) 
+                                                   : 
IndexStatusManager.toSerializedFormat(indexStatusMap);
+            IndexStatusManager.instance.receivePeerIndexStatus(endpoint, 
VersionedValue.unsafeMakeVersionedValue(serialized, 1));
+        });
 
         // sort the replicas here, so that we can assert the order later
         EndpointsForRange endpoints = EndpointsForRange.copyOf(new 
TreeSet<>(replicas));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to