This is an automated email from the ASF dual-hosted git repository.
maedhroz pushed a commit to branch cassandra-5.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-5.0 by this push:
new 376fe2a9fe Streamline the serialized format for index status gossip
messages
376fe2a9fe is described below
commit 376fe2a9fe3f13c7555c40cda6d3912d55ef63cc
Author: Caleb Rackliffe <[email protected]>
AuthorDate: Tue Nov 12 19:10:15 2024 -0600
Streamline the serialized format for index status gossip messages
patch by Caleb Rackliffe; reviewed by David Capwell for CASSANDRA-20058
---
CHANGES.txt | 1 +
src/java/org/apache/cassandra/index/Index.java | 36 +++-
.../apache/cassandra/index/IndexStatusManager.java | 117 +++++++++--
.../test/sai/IndexAvailabilityTest.java | 221 ++++++++++++++-------
.../cassandra/index/IndexStatusManagerTest.java | 28 ++-
5 files changed, 298 insertions(+), 105 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 003d870a25..bb5f13abce 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
5.0.3
+ * Streamline the serialized format for index status gossip messages
(CASSANDRA-20058)
* Batch clusterings into single SAI partition post-filtering reads
(CASSANDRA-19497)
* Ban the usage of "var" instead of full types in the production code
(CASSANDRA-20038)
* Suppress CVE-2024-45772 from lucene-core-9.7.0.jar (CASSANDRA-20024)
diff --git a/src/java/org/apache/cassandra/index/Index.java
b/src/java/org/apache/cassandra/index/Index.java
index 8abc800e0f..f9e2265631 100644
--- a/src/java/org/apache/cassandra/index/Index.java
+++ b/src/java/org/apache/cassandra/index/Index.java
@@ -1031,10 +1031,36 @@ public interface Index
*/
enum Status
{
- UNKNOWN,
- FULL_REBUILD_STARTED,
- BUILD_FAILED,
- BUILD_SUCCEEDED,
- DROPPED
+ UNKNOWN(0),
+ FULL_REBUILD_STARTED(1),
+ BUILD_FAILED(2),
+ BUILD_SUCCEEDED(3),
+ DROPPED(4);
+
+ public final int code;
+
+ Status(int code)
+ {
+ this.code = code;
+ }
+
+ static Status fromCode(int code)
+ {
+ switch (code)
+ {
+ case 0:
+ return UNKNOWN;
+ case 1:
+ return FULL_REBUILD_STARTED;
+ case 2:
+ return BUILD_FAILED;
+ case 3:
+ return BUILD_SUCCEEDED;
+ case 4:
+ return DROPPED;
+ }
+
+ throw new IllegalArgumentException("Unrecognized code: " + code);
+ }
}
}
diff --git a/src/java/org/apache/cassandra/index/IndexStatusManager.java
b/src/java/org/apache/cassandra/index/IndexStatusManager.java
index 1c0f5887db..6ba91beb7b 100644
--- a/src/java/org/apache/cassandra/index/IndexStatusManager.java
+++ b/src/java/org/apache/cassandra/index/IndexStatusManager.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -45,6 +46,7 @@ import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.serializers.MarshalException;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.CassandraVersion;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JsonUtils;
@@ -73,8 +75,7 @@ public class IndexStatusManager
*/
public final Map<InetAddressAndPort, Map<String, Index.Status>>
peerIndexStatus = new HashMap<>();
- private IndexStatusManager()
- {}
+ private IndexStatusManager() {}
/**
* Remove endpoints whose indexes are not queryable for the specified
{@link Index.QueryPlan}.
@@ -148,27 +149,55 @@ public class IndexStatusManager
if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
return;
- Map<String, String> peerStatus =
JsonUtils.fromJsonMap(versionedValue.value);
- Map<String, Index.Status> indexStatus = new HashMap<>();
+ Map<String, Index.Status> indexStatusMap =
statusMapFromString(versionedValue);
- for (Map.Entry<String, String> e : peerStatus.entrySet())
- {
- String keyspaceIndex = e.getKey();
- Index.Status status = Index.Status.valueOf(e.getValue());
- indexStatus.put(keyspaceIndex, status);
- }
-
- Map<String, Index.Status> oldStatus =
peerIndexStatus.put(endpoint, indexStatus);
- Map<String, Index.Status> updated =
updatedIndexStatuses(oldStatus, indexStatus);
- Set<String> removed = removedIndexStatuses(oldStatus, indexStatus);
+ Map<String, Index.Status> oldStatus =
peerIndexStatus.put(endpoint, indexStatusMap);
+ Map<String, Index.Status> updated =
updatedIndexStatuses(oldStatus, indexStatusMap);
+ Set<String> removed = removedIndexStatuses(oldStatus,
indexStatusMap);
if (!updated.isEmpty() || !removed.isEmpty())
logger.debug("Received index status for peer {}:\n Updated:
{}\n Removed: {}",
endpoint, updated, removed);
}
- catch (MarshalException | IllegalArgumentException e)
+ catch (Exception e)
{
- logger.warn("Unable to parse index status: {}", e.getMessage());
+ logger.error("Unable to parse index status: {}", e.getMessage());
+ }
+ }
+
+ private Map<String, Index.Status> statusMapFromString(VersionedValue
versionedValue)
+ {
+ Map<String, Object> peerStatus =
JsonUtils.fromJsonMap(versionedValue.value);
+ Map<String, Index.Status> indexStatusMap = new HashMap<>();
+
+ for (Map.Entry<String, Object> endpointStatus : peerStatus.entrySet())
+ {
+ String keyspaceOrIndex = endpointStatus.getKey();
+ Object keyspaceOrIndexStatus = endpointStatus.getValue();
+
+ if (keyspaceOrIndexStatus instanceof String)
+ {
+ // This is the legacy format: (fully qualified index name ->
enum string)
+ Index.Status status =
Index.Status.valueOf(keyspaceOrIndexStatus.toString());
+ indexStatusMap.put(keyspaceOrIndex, status);
+ }
+ else if (keyspaceOrIndexStatus instanceof Map)
+ {
+ // This is the new format. (keyspace -> (index -> numeric enum
code))
+ @SuppressWarnings("unchecked")
+ Map<String, Integer> keyspaceIndexStatusMap = (Map<String,
Integer>) keyspaceOrIndexStatus;
+
+ for (Map.Entry<String, Integer> indexStatus :
keyspaceIndexStatusMap.entrySet())
+ {
+ Index.Status status =
Index.Status.fromCode(indexStatus.getValue());
+ indexStatusMap.put(identifier(keyspaceOrIndex,
indexStatus.getKey()), status);
+ }
+ }
+ else
+ {
+ throw new MarshalException("Invalid index status format: " +
endpointStatus);
+ }
}
+ return indexStatusMap;
}
/**
@@ -183,35 +212,79 @@ public class IndexStatusManager
{
try
{
- Map<String, Index.Status> states =
peerIndexStatus.computeIfAbsent(FBUtilities.getBroadcastAddressAndPort(),
+ Map<String, Index.Status> statusMap =
peerIndexStatus.computeIfAbsent(FBUtilities.getBroadcastAddressAndPort(),
k -> new HashMap<>());
String keyspaceIndex = identifier(keyspace, index);
if (status == Index.Status.DROPPED)
- states.remove(keyspaceIndex);
+ statusMap.remove(keyspaceIndex);
else
- states.put(keyspaceIndex, status);
+ statusMap.put(keyspaceIndex, status);
// Don't try and propagate if the gossiper isn't enabled. This is
primarily for tests where the
// Gossiper has not been started. If we attempt to propagate when
not started an exception is
// logged and this causes a number of dtests to fail.
if (Gossiper.instance.isEnabled())
{
- String newStatus =
JsonUtils.JSON_OBJECT_MAPPER.writeValueAsString(states);
+ // Versions 5.0.0 through 5.0.2 use a much more bloated format
that duplicates keyspace names
+ // and writes full status names instead of their numeric
codes. If the minimum cluster version is
+ // unknown or one of those 3 versions, continue to propagate
the old format.
+ CassandraVersion minVersion =
Gossiper.instance.getMinVersion(1, TimeUnit.SECONDS);
+ String newSerializedStatusMap =
shouldWriteLegacyStatusFormat(minVersion) ?
JsonUtils.writeAsJsonString(statusMap)
+
: toSerializedFormat(statusMap);
+
statusPropagationExecutor.submit(() -> {
// schedule gossiper update asynchronously to avoid
potential deadlock when another thread is holding
// gossiper taskLock.
- VersionedValue value =
StorageService.instance.valueFactory.indexStatus(newStatus);
+ VersionedValue value =
StorageService.instance.valueFactory.indexStatus(newSerializedStatusMap);
Gossiper.instance.addLocalApplicationState(ApplicationState.INDEX_STATUS,
value);
});
}
}
- catch (Throwable e)
+ catch (Exception e)
{
logger.warn("Unable to propagate index status: {}",
e.getMessage());
}
}
+ private static boolean shouldWriteLegacyStatusFormat(CassandraVersion
minVersion)
+ {
+ return minVersion == null || (minVersion.major == 5 &&
minVersion.minor == 0 && minVersion.patch < 3);
+ }
+
+ /**
+ * Serializes as a JSON string the status of the indexes in the provided
map.
+ * <p>
+ * For example, the map...
+ * <pre>
+ * {
+ * ks1.cf1_idx1=FULL_REBUILD_STARTED,
+ * ks1.cf1_idx2=FULL_REBUILD_STARTED,
+ * system.PaxosUncommittedIndex=BUILD_SUCCEEDED
+ * }
+ * </pre>
+ * ...will be converted to the string...
+ * <pre>
+ * {
+ * "system": {"PaxosUncommittedIndex": 3},
+ * "ks1": {"cf1_idx1": 1, "cf1_idx2": 1}
+ * }
+ * </pre>
+ */
+ public static String toSerializedFormat(Map<String, Index.Status>
indexStatusMap)
+ {
+ Map<String, Map<String, Integer>> serialized = new HashMap<>();
+
+ for (Map.Entry<String, Index.Status> e : indexStatusMap.entrySet())
+ {
+ String[] keyspaceAndIndex = e.getKey().split("\\.");
+ serialized.computeIfAbsent(keyspaceAndIndex[0], ignore -> new
HashMap<>())
+ .put(keyspaceAndIndex[1], e.getValue().code);
+ }
+
+ return JsonUtils.writeAsJsonString(serialized);
+ }
+
@VisibleForTesting
public synchronized Index.Status getIndexStatus(InetAddressAndPort peer,
String keyspace, String index)
{
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/sai/IndexAvailabilityTest.java
b/test/distributed/org/apache/cassandra/distributed/test/sai/IndexAvailabilityTest.java
index 3a9e111bad..5731281601 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/sai/IndexAvailabilityTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/sai/IndexAvailabilityTest.java
@@ -29,6 +29,9 @@ import java.util.concurrent.TimeUnit;
import com.google.common.base.Objects;
import org.junit.Test;
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.test.TestBaseImpl;
@@ -41,6 +44,7 @@ import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.utils.FBUtilities;
+import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
import static org.apache.cassandra.distributed.api.Feature.NETWORK;
import static
org.apache.cassandra.distributed.test.sai.SAIUtil.waitForIndexQueryable;
@@ -62,88 +66,117 @@ public class IndexAvailabilityTest extends TestBaseImpl
public void verifyIndexStatusPropagation() throws Exception
{
try (Cluster cluster = init(Cluster.build(2)
- .withConfig(config ->
config.with(GOSSIP)
-
.with(NETWORK))
+ .withConfig(config ->
config.with(GOSSIP).with(NETWORK))
.start()))
{
- String ks1 = "ks1";
- String ks2 = "ks2";
- String ks3 = "ks3";
- String cf1 = "cf1";
- String index1 = "cf1_idx1";
- String index2 = "cf1_idx2";
-
- keyspaces = Arrays.asList(ks1, ks2, ks3);
- indexesPerKs = Arrays.asList(index1, index2);
-
- // create 1 tables per keyspace, 2 indexes per table. all indexes
are queryable
- for (String ks : keyspaces)
- {
- cluster.schemaChange(String.format(CREATE_KEYSPACE, ks, 2));
- cluster.schemaChange(String.format(CREATE_TABLE, ks, cf1));
- cluster.schemaChange(String.format(CREATE_INDEX, index1, ks,
cf1, "v1"));
- cluster.schemaChange(String.format(CREATE_INDEX, index2, ks,
cf1, "v2"));
- waitForIndexQueryable(cluster, ks);
- cluster.forEach(node -> {
- expectedNodeIndexQueryability.put(NodeIndex.create(ks,
index1, node), Index.Status.BUILD_SUCCEEDED);
- expectedNodeIndexQueryability.put(NodeIndex.create(ks,
index2, node), Index.Status.BUILD_SUCCEEDED);
- });
- }
+ verifyIndexStatusPropagation(cluster);
+ }
+ }
- // mark ks1 index1 as non-queryable on node1
- markIndexNonQueryable(cluster.get(1), ks1, cf1, index1);
- // on node2, it observes that node1 ks1.index1 is not queryable
- waitForIndexingStatus(cluster.get(2), ks1, index1, cluster.get(1),
Index.Status.BUILD_FAILED);
- // other indexes or keyspaces should not be affected
- assertIndexingStatus(cluster);
-
- // mark ks2 index2 as non-queryable on node2
- markIndexNonQueryable(cluster.get(2), ks2, cf1, index2);
- // on node1, it observes that node2 ks2.index2 is not queryable
- waitForIndexingStatus(cluster.get(1), ks2, index2, cluster.get(2),
Index.Status.BUILD_FAILED);
- // other indexes or keyspaces should not be affected
- assertIndexingStatus(cluster);
-
- // mark ks1 index1 as queryable on node1
- markIndexQueryable(cluster.get(1), ks1, cf1, index1);
- // on node2, it observes that node1 ks1.index1 is queryable
- waitForIndexingStatus(cluster.get(2), ks1, index1, cluster.get(1),
Index.Status.BUILD_SUCCEEDED);
- // other indexes or keyspaces should not be affected
- assertIndexingStatus(cluster);
-
- // mark ks2 index2 as indexing on node1
- markIndexBuilding(cluster.get(1), ks2, cf1, index2);
- // on node2, it observes that node1 ks2.index2 is not queryable
- waitForIndexingStatus(cluster.get(2), ks2, index2, cluster.get(1),
Index.Status.FULL_REBUILD_STARTED);
- // other indexes or keyspaces should not be affected
- assertIndexingStatus(cluster);
-
- // drop ks1, ks1 index1/index2 should be non queryable on all nodes
- cluster.schemaChange("DROP KEYSPACE " + ks1);
- expectedNodeIndexQueryability.keySet().forEach(k -> {
- if (k.keyspace.equals(ks1))
- expectedNodeIndexQueryability.put(k, Index.Status.UNKNOWN);
- });
- assertIndexingStatus(cluster);
+ @Test
+ public void verifyIndexStatusPropagationMixedPatchVersion() throws
Exception
+ {
+ try (Cluster cluster = init(Cluster.build(2)
+ .withConfig(config ->
config.with(GOSSIP).with(NETWORK))
+
.withInstanceInitializer(MixedPatchVersionHelper::setVersions)
+ .start()))
+ {
+ verifyIndexStatusPropagation(cluster);
+ }
+ }
- // drop ks2 index2, there should be no ks2 index2 status on all
node
- cluster.schemaChange("DROP INDEX " + ks2 + "." + index2);
- expectedNodeIndexQueryability.keySet().forEach(k -> {
- if (k.keyspace.equals(ks2) && k.index.equals(index2))
- expectedNodeIndexQueryability.put(k, Index.Status.UNKNOWN);
- });
- assertIndexingStatus(cluster);
+ @Test
+ public void verifyIndexStatusPropagationMixedMajorVersion() throws
Exception
+ {
+ try (Cluster cluster = init(Cluster.build(2)
+ .withConfig(config ->
config.with(GOSSIP).with(NETWORK))
+
.withInstanceInitializer(MixedMajorVersionHelper::setVersions)
+ .start()))
+ {
+ verifyIndexStatusPropagation(cluster);
+ }
+ }
+
+ private void verifyIndexStatusPropagation(Cluster cluster)
+ {
+ String ks1 = "ks1";
+ String ks2 = "ks2";
+ String ks3 = "ks3";
+ String cf1 = "cf1";
+ String index1 = "cf1_idx1";
+ String index2 = "cf1_idx2";
+
+ keyspaces = Arrays.asList(ks1, ks2, ks3);
+ indexesPerKs = Arrays.asList(index1, index2);
- // drop ks3 cf1, there should be no ks3 index1/index2 status
- cluster.schemaChange("DROP TABLE " + ks3 + "." + cf1);
- expectedNodeIndexQueryability.keySet().forEach(k -> {
- if (k.keyspace.equals(ks3))
- expectedNodeIndexQueryability.put(k, Index.Status.UNKNOWN);
+ // create 1 tables per keyspace, 2 indexes per table. all indexes are
queryable
+ for (String ks : keyspaces)
+ {
+ cluster.schemaChange(String.format(CREATE_KEYSPACE, ks, 2));
+ cluster.schemaChange(String.format(CREATE_TABLE, ks, cf1));
+ cluster.schemaChange(String.format(CREATE_INDEX, index1, ks, cf1,
"v1"));
+ cluster.schemaChange(String.format(CREATE_INDEX, index2, ks, cf1,
"v2"));
+ waitForIndexQueryable(cluster, ks);
+ cluster.forEach(node -> {
+ expectedNodeIndexQueryability.put(NodeIndex.create(ks, index1,
node), Index.Status.BUILD_SUCCEEDED);
+ expectedNodeIndexQueryability.put(NodeIndex.create(ks, index2,
node), Index.Status.BUILD_SUCCEEDED);
});
- assertIndexingStatus(cluster);
}
+
+ // mark ks1 index1 as non-queryable on node1
+ markIndexNonQueryable(cluster.get(1), ks1, cf1, index1);
+ // on node2, it observes that node1 ks1.index1 is not queryable
+ waitForIndexingStatus(cluster.get(2), ks1, index1, cluster.get(1),
Index.Status.BUILD_FAILED);
+ // other indexes or keyspaces should not be affected
+ assertIndexingStatus(cluster);
+
+ // mark ks2 index2 as non-queryable on node2
+ markIndexNonQueryable(cluster.get(2), ks2, cf1, index2);
+ // on node1, it observes that node2 ks2.index2 is not queryable
+ waitForIndexingStatus(cluster.get(1), ks2, index2, cluster.get(2),
Index.Status.BUILD_FAILED);
+ // other indexes or keyspaces should not be affected
+ assertIndexingStatus(cluster);
+
+ // mark ks1 index1 as queryable on node1
+ markIndexQueryable(cluster.get(1), ks1, cf1, index1);
+ // on node2, it observes that node1 ks1.index1 is queryable
+ waitForIndexingStatus(cluster.get(2), ks1, index1, cluster.get(1),
Index.Status.BUILD_SUCCEEDED);
+ // other indexes or keyspaces should not be affected
+ assertIndexingStatus(cluster);
+
+ // mark ks2 index2 as indexing on node1
+ markIndexBuilding(cluster.get(1), ks2, cf1, index2);
+ // on node2, it observes that node1 ks2.index2 is not queryable
+ waitForIndexingStatus(cluster.get(2), ks2, index2, cluster.get(1),
Index.Status.FULL_REBUILD_STARTED);
+ // other indexes or keyspaces should not be affected
+ assertIndexingStatus(cluster);
+
+ // drop ks1, ks1 index1/index2 should be non queryable on all nodes
+ cluster.schemaChange("DROP KEYSPACE " + ks1);
+ expectedNodeIndexQueryability.keySet().forEach(k -> {
+ if (k.keyspace.equals(ks1))
+ expectedNodeIndexQueryability.put(k, Index.Status.UNKNOWN);
+ });
+ assertIndexingStatus(cluster);
+
+ // drop ks2 index2, there should be no ks2 index2 status on all node
+ cluster.schemaChange("DROP INDEX " + ks2 + '.' + index2);
+ expectedNodeIndexQueryability.keySet().forEach(k -> {
+ if (k.keyspace.equals(ks2) && k.index.equals(index2))
+ expectedNodeIndexQueryability.put(k, Index.Status.UNKNOWN);
+ });
+ assertIndexingStatus(cluster);
+
+ // drop ks3 cf1, there should be no ks3 index1/index2 status
+ cluster.schemaChange("DROP TABLE " + ks3 + '.' + cf1);
+ expectedNodeIndexQueryability.keySet().forEach(k -> {
+ if (k.keyspace.equals(ks3))
+ expectedNodeIndexQueryability.put(k, Index.Status.UNKNOWN);
+ });
+ assertIndexingStatus(cluster);
}
+ @SuppressWarnings("DataFlowIssue")
private void markIndexNonQueryable(IInvokableInstance node, String
keyspace, String table, String indexName)
{
expectedNodeIndexQueryability.put(NodeIndex.create(keyspace,
indexName, node), Index.Status.BUILD_FAILED);
@@ -155,6 +188,7 @@ public class IndexAvailabilityTest extends TestBaseImpl
});
}
+ @SuppressWarnings("DataFlowIssue")
private void markIndexQueryable(IInvokableInstance node, String keyspace,
String table, String indexName)
{
expectedNodeIndexQueryability.put(NodeIndex.create(keyspace,
indexName, node), Index.Status.BUILD_SUCCEEDED);
@@ -166,6 +200,7 @@ public class IndexAvailabilityTest extends TestBaseImpl
});
}
+ @SuppressWarnings("DataFlowIssue")
private void markIndexBuilding(IInvokableInstance node, String keyspace,
String table, String indexName)
{
expectedNodeIndexQueryability.put(NodeIndex.create(keyspace,
indexName, node), Index.Status.FULL_REBUILD_STARTED);
@@ -285,4 +320,44 @@ public class IndexAvailabilityTest extends TestBaseImpl
return Objects.hashCode(keyspace, index, node);
}
}
+
+ public static class MixedMajorVersionHelper
+ {
+ @SuppressWarnings({ "unused", "resource" })
+ static void setVersions(ClassLoader loader, int node)
+ {
+ if (node == 1)
+ new ByteBuddy().rebase(FBUtilities.class)
+ .method(named("getReleaseVersionString"))
+
.intercept(MethodDelegation.to(MixedMajorVersionHelper.class))
+ .make()
+ .load(loader,
ClassLoadingStrategy.Default.INJECTION);
+ }
+
+ @SuppressWarnings("unused")
+ public static String getReleaseVersionString()
+ {
+ return "4.1.0";
+ }
+ }
+
+ public static class MixedPatchVersionHelper
+ {
+ @SuppressWarnings({ "unused", "resource" })
+ static void setVersions(ClassLoader loader, int node)
+ {
+ if (node == 1)
+ new ByteBuddy().rebase(FBUtilities.class)
+ .method(named("getReleaseVersionString"))
+
.intercept(MethodDelegation.to(MixedPatchVersionHelper.class))
+ .make()
+ .load(loader,
ClassLoadingStrategy.Default.INJECTION);
+ }
+
+ @SuppressWarnings("unused")
+ public static String getReleaseVersionString()
+ {
+ return "5.0.2";
+ }
+ }
}
diff --git a/test/unit/org/apache/cassandra/index/IndexStatusManagerTest.java
b/test/unit/org/apache/cassandra/index/IndexStatusManagerTest.java
index d08fec974a..331e079680 100644
--- a/test/unit/org/apache/cassandra/index/IndexStatusManagerTest.java
+++ b/test/unit/org/apache/cassandra/index/IndexStatusManagerTest.java
@@ -19,12 +19,16 @@
package org.apache.cassandra.index;
import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import org.mockito.Mockito;
import org.apache.cassandra.db.ConsistencyLevel;
@@ -47,8 +51,21 @@ import static org.apache.cassandra.locator.ReplicaUtils.full;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.assertArrayEquals;
+@RunWith(Parameterized.class)
public class IndexStatusManagerTest
{
+ @Parameterized.Parameter
+ public boolean legacyStatusFormat;
+
+ @Parameterized.Parameters(name = "{0}")
+ public static List<Object[]> parameters()
+ {
+ List<Object[]> parameters = new ArrayList<>();
+ parameters.add(new Object[] { true });
+ parameters.add(new Object[] { false });
+ return parameters;
+ }
+
static class Testcase
{
String keyspace;
@@ -389,11 +406,12 @@ public class IndexStatusManagerTest
.collect(Collectors.toSet());
// send indexStatus for each endpoint
- testcase.indexStatus.forEach((endpoint, indexStatus) ->
- IndexStatusManager.instance.receivePeerIndexStatus(
- endpoint,
-
VersionedValue.unsafeMakeVersionedValue(JsonUtils.writeAsJsonString(indexStatus),
1)
- ));
+ testcase.indexStatus.forEach((endpoint, indexStatusMap) ->
+ {
+ String serialized = legacyStatusFormat ?
JsonUtils.writeAsJsonString(indexStatusMap)
+ :
IndexStatusManager.toSerializedFormat(indexStatusMap);
+ IndexStatusManager.instance.receivePeerIndexStatus(endpoint,
VersionedValue.unsafeMakeVersionedValue(serialized, 1));
+ });
// sort the replicas here, so that we can assert the order later
EndpointsForRange endpoints = EndpointsForRange.copyOf(new
TreeSet<>(replicas));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]