yifan-c commented on code in PR #149:
URL:
https://github.com/apache/cassandra-analytics/pull/149#discussion_r2666703521
##########
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraInstance.java:
##########
@@ -34,9 +37,9 @@ public class CassandraInstance implements TokenOwner,
Serializable
public static final CassandraInstance.Serializer SERIALIZER = new
CassandraInstance.Serializer();
private static final long serialVersionUID = 6767636627576239773L;
- private final String token;
- private final String node;
- private final String dataCenter;
+ private String token;
+ private String node;
+ private String dataCenter;
Review Comment:
Once readObject & writeObject are removed, the `final` modifiers can be
restored.
##########
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraInstance.java:
##########
@@ -100,6 +103,20 @@ public String toString()
return String.format("{\"token\"=\"%s\", \"node\"=\"%s\",
\"dc\"=\"%s\"}", token, node, dataCenter);
}
+ private void readObject(ObjectInputStream in) throws IOException,
ClassNotFoundException
+ {
+ this.token = in.readUTF();
+ this.node = in.readUTF();
+ this.dataCenter = in.readUTF();
+ }
+
+ private void writeObject(ObjectOutputStream out) throws IOException
+ {
+ out.writeUTF(token());
+ out.writeUTF(nodeName());
+ out.writeUTF(dataCenter());
+ }
Review Comment:
Why adding the custom serialization code? The string fields should be
serializable by default, and the readObject & writeObject methods are
unnecessary.
##########
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/MurmurHash.java:
##########
@@ -156,4 +156,89 @@ public static long[] hash(ByteBuffer key, int offset, int
length, long seed)
return new long[]{h1, h2};
}
+
+ protected static long invRShiftXor(long value, int shift)
Review Comment:
Why are those new methods added?
##########
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraRing.java:
##########
@@ -74,6 +74,7 @@ public class CassandraRing implements Serializable
private ReplicationFactor replicationFactor;
private List<CassandraInstance> instances;
+ private transient RangeMap<BigInteger, List<CassandraInstance>>
replicasForRanges;
private transient RangeMap<BigInteger, List<CassandraInstance>> replicas;
Review Comment:
It is confusing to have 2 maps and they actually point to the same object if
I am not mistaken.
More importantly, it leads to different `init` code paths. The patch is to
fix the broken token calculation, but why leaving it half-fixed? The CDC code
patch still calls the original constructor, which means CDC is still broken.
##########
cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderTokenRangeReplicasTest.java:
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.analytics;
+
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.junit.jupiter.api.Test;
+
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.sidecar.testing.QualifiedName;
+import org.apache.cassandra.spark.data.partitioner.MurmurHash;
+import org.apache.cassandra.testing.ClusterBuilderConfiguration;
+import org.apache.cassandra.testing.utils.ClusterUtils;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.jetbrains.annotations.NotNull;
+
+import static
org.apache.cassandra.distributed.shared.NetworkTopology.dcAndRack;
+import static org.apache.cassandra.testing.TestUtils.DC1_RF3_DC2_RF3;
+import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE;
+import static org.apache.cassandra.testing.TestUtils.uniqueTestTableFullName;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * <h3>Single rack replica placement with NetworkTopologyStrategy</h3>
+ * <ul>
+ * <li>DC1: {Rack1:[Node1, Node2, Node3, Node4]}, Replication Factor: 3</li>
+ * <li>DC2: {Rack1:[Node5, Node6, Node7, Node8]}, Replication Factor: 3</li>
+ * </ul>
+ *
+ * <p>Token range ownership: {Node1: T1, Node2: T2, Node3: T3, Node4: T4,
Node5: T5, Node6: T6, Node7: T7, Node8: T8}</p>
+ * <p>T1 will be replicated in the next 2 nodes in the same DC1(Node2, Node3)
and the first 3 nodes in DC2(Node5, Node6, Node7).</p>
+ * <p>For each token range the replicas are:</p>
+ * <pre>
+ * T1:[Node1, Node2, Node3, Node5, Node6, Node7]
+ * T2:[Node2, Node3, Node4, Node5, Node6, Node7]
+ * T3:[Node3, Node4, Node1, Node5, Node6, Node7]
+ * T4:[Node4, Node1, Node2, Node5, Node6, Node7]
+ * T5:[Node5, Node6, Node7, Node1, Node2, Node3]
+ * T6:[Node6, Node7, Node8, Node1, Node2, Node3]
+ * T7:[Node7, Node8, Node5, Node1, Node2, Node3]
+ * T8:[Node8, Node5, Node6, Node1, Node2, Node3]
+ * </pre>
+ *
+ * <h3>Multi-rack replica placement with NetworkTopologyStrategy</h3>
+ * <ul>
+ * <li>DC1: {Rack1:[Node1, Node2], Rack2:[Node3], Rack3:[Node4]},
Replication Factor: 3</li>
+ * <li>DC2: {Rack1:[Node5, Node6], Rack2:[Node7], Rack3:[Node8]},
Replication Factor: 3</li>
+ * </ul>
+ *
+ * <p>Cassandra will try to place replicas in different racks.</p>
+ * <p>T1 will be replicated in the next 2 nodes in the same DC1 and different
racks (Node3, Node4) and the first 3 nodes
+ * in different racks in DC2(Node5, Node7, Node8).</p>
+ * <p>For each token range the replicas are:</p>
+ * <pre>
+ * T1:[Node1, Node3, Node4, Node5, Node7, Node8]
+ * T2:[Node2, Node3, Node4, Node5, Node7, Node8]
+ * T3:[Node3, Node4, Node1, Node5, Node7, Node8]
+ * T4:[Node4, Node1, Node3, Node5, Node7, Node8]
+ * T5:[Node5, Node7, Node8, Node1, Node3, Node4]
+ * T6:[Node6, Node7, Node8, Node1, Node3, Node4]
+ * T7:[Node7, Node8, Node5, Node1, Node3, Node4]
+ * T8:[Node8, Node5, Node7, Node1, Node3, Node4]
+ * </pre>
+ */
+public class BulkReaderTokenRangeReplicasTest extends
SharedClusterSparkIntegrationTestBase
+{
+ QualifiedName table1 = uniqueTestTableFullName(TEST_KEYSPACE);
+ private static final String VALUE1 = "VAL1";
+ private static final String VALUE2 = "VAL2";
+
+ @Override
+ protected ClusterBuilderConfiguration testClusterConfiguration()
+ {
+ return super.testClusterConfiguration()
+ .dcCount(2)
+ .nodesPerDc(4)
+ .dcAndRackSupplier((nodeId) -> {
+ switch (nodeId)
+ {
+ case 1:
+ case 2:
+ return dcAndRack("datacenter1", "rack1");
+ case 3:
+ return dcAndRack("datacenter1", "rack2");
+ case 4:
+ return dcAndRack("datacenter1", "rack3");
+ case 5:
+ case 6:
+ return dcAndRack("datacenter2", "rack1");
+ case 7:
+ return dcAndRack("datacenter2", "rack2");
+ case 8:
+ return dcAndRack("datacenter2", "rack3");
+ default:
+ return dcAndRack("", "");
+ }
+ });
+ }
+
+
+ @Test
+ void testMultiDCMultiRack()
+ {
+ // get token for node 1
+ long token = getTokenForNode(1);
+ // reverse hash the token to a blob key
+ ByteBuffer key = keyForToken(token);
+ // insert value for the key in node 1 token range
+ insert(key, VALUE1);
+ // Nodes placement:
+ // DC1: {rack1: [node1, node2], rack2:[node3], rack3:[node4]}
+ // DC2: {rack1: [node5, node6], rack2:[node7], rack3:[node8]}
+ // validate that all nodes except node 2, 6 stored the key, value.
+ Map<Integer, String> expectedValuesInNodes = new HashMap<>(Map.of(1,
VALUE1,
+ 3,
VALUE1,
+ 4,
VALUE1,
+ 5,
VALUE1,
+ 7,
VALUE1,
+ 8,
VALUE1));
+ validateValuesInNodes(expectedValuesInNodes, key);
+
+ // update the value internally at node 4
+ updateInternal(4, key, VALUE2);
+ // validate the values across the nodes:
+ // node 4 should have VALUE2
+ // node 2, node 6 shouldn't have the key
+ // all other nodes should have VALUE1
+ expectedValuesInNodes.put(4, VALUE2);
+ //updateInternal(1, key, VALUE2);
+ //expectedValuesInNodes.put(1, VALUE2);
Review Comment:
Are those 2 lines wanted?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]