frankgh commented on code in PR #78:
URL:
https://github.com/apache/cassandra-analytics/pull/78#discussion_r1747575920
##########
scripts/build-dtest-jars.sh:
##########
@@ -118,4 +118,6 @@ else
exit ${RETURN}
fi
done
+ # always delete the Cassandra source after dtest.jar is built to avoid
confusing IDE
Review Comment:
can we alternatively exclude these files from the IDE's indexing?
##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/TokenRangeMapping.java:
##########
@@ -22,55 +22,131 @@
import java.io.Serializable;
import java.math.BigInteger;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Range;
import com.google.common.collect.RangeMap;
import com.google.common.collect.TreeRangeMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import o.a.c.sidecar.client.shaded.common.response.TokenRangeReplicasResponse;
+import
o.a.c.sidecar.client.shaded.common.response.TokenRangeReplicasResponse.ReplicaInfo;
import
o.a.c.sidecar.client.shaded.common.response.TokenRangeReplicasResponse.ReplicaMetadata;
-import org.apache.cassandra.spark.bulkwriter.RingInstance;
import org.apache.cassandra.spark.common.model.CassandraInstance;
import org.apache.cassandra.spark.data.ReplicationFactor;
import org.apache.cassandra.spark.data.partitioner.Partitioner;
import org.jetbrains.annotations.Nullable;
-// TODO: refactor to improve the return types of methods to use `Instance`
instead of String and cleanup
-public class TokenRangeMapping<Instance extends CassandraInstance> implements
Serializable
+public class TokenRangeMapping<I extends CassandraInstance> implements
Serializable
{
private static final long serialVersionUID = -7284933683815811160L;
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TokenRangeMapping.class);
+
private final Partitioner partitioner;
private final ReplicationFactor replicationFactor;
- private final transient Set<RingInstance> replacementInstances;
- private final transient RangeMap<BigInteger, List<Instance>>
replicasByTokenRange;
- private final transient Multimap<Instance, Range<BigInteger>>
tokenRangeMap;
- private final transient Map<String, Set<String>> writeReplicasByDC;
- private final transient Map<String, Set<String>> pendingReplicasByDC;
- private final transient List<ReplicaMetadata> replicaMetadata;
+ private final transient Set<I> allInstances;
+ private final transient RangeMap<BigInteger, List<I>> replicasByTokenRange;
+ private final transient Multimap<I, Range<BigInteger>> tokenRangeMap;
+ private final transient Map<String, Set<I>> writeReplicasByDC;
+ private final transient Map<String, Set<I>> pendingReplicasByDC;
+
+ public static <I extends CassandraInstance>
+ TokenRangeMapping<I> create(Supplier<TokenRangeReplicasResponse>
topologySupplier,
+ Supplier<Partitioner> partitionerSupplier,
+ Supplier<ReplicationFactor>
replicationFactorSupplier,
+ Function<ReplicaMetadata, I> instanceCreator)
+ {
+ TokenRangeReplicasResponse response = topologySupplier.get();
+ Map<String, I> instanceByIpAddress = new
HashMap<>(response.replicaMetadata().size());
+ response.replicaMetadata()
+ .forEach((ipAddress, metadata) ->
instanceByIpAddress.put(ipAddress, instanceCreator.apply(metadata)));
+
+ Multimap<I, Range<BigInteger>> tokenRangesByInstance =
tokenRangesByInstance(response.writeReplicas(),
+
instanceByIpAddress);
+
+ // Each token range has hosts by DC. We collate them across all ranges
into all hosts by DC
+ Map<String, Set<I>> writeReplicasByDC = new HashMap<>();
+ Map<String, Set<I>> pendingReplicasByDC = new HashMap<>();
+ Set<I> allInstances = new HashSet<>(instanceByIpAddress.values());
+ for (I instance : allInstances)
+ {
+ Set<I> dc =
writeReplicasByDC.computeIfAbsent(instance.datacenter(), k -> new HashSet<>());
+ dc.add(instance);
+ if (instance.nodeState().isPending)
+ {
+ Set<I> pendingInDc =
pendingReplicasByDC.computeIfAbsent(instance.datacenter(), k -> new
HashSet<>());
+ pendingInDc.add(instance);
+ }
+ }
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Fetched token-ranges with dcs={},
write_replica_count={}, pending_replica_count={}",
+ writeReplicasByDC.keySet(),
+
writeReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size(),
+
pendingReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size());
+ }
+
+ Map<String, ReplicaMetadata> replicaMetadata =
response.replicaMetadata();
Review Comment:
unused?
##########
scripts/build-sidecar.sh:
##########
@@ -77,4 +77,6 @@ else
fi
git clean -fd
./gradlew -Pversion=${SIDECAR_BUILD_VERSION}
-Dmaven.repo.local=${SIDECAR_JAR_DIR} publishToMavenLocal
+ # Delete sidecar source after publishing to avoid confusing IDE
+ rm -rf "${SIDECAR_BUILD_DIR}"
Review Comment:
can we alternatively exclude these files from the IDE's indexing?
##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/TokenRangeMapping.java:
##########
@@ -22,55 +22,131 @@
import java.io.Serializable;
import java.math.BigInteger;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Range;
import com.google.common.collect.RangeMap;
import com.google.common.collect.TreeRangeMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import o.a.c.sidecar.client.shaded.common.response.TokenRangeReplicasResponse;
+import
o.a.c.sidecar.client.shaded.common.response.TokenRangeReplicasResponse.ReplicaInfo;
import
o.a.c.sidecar.client.shaded.common.response.TokenRangeReplicasResponse.ReplicaMetadata;
-import org.apache.cassandra.spark.bulkwriter.RingInstance;
import org.apache.cassandra.spark.common.model.CassandraInstance;
import org.apache.cassandra.spark.data.ReplicationFactor;
import org.apache.cassandra.spark.data.partitioner.Partitioner;
import org.jetbrains.annotations.Nullable;
-// TODO: refactor to improve the return types of methods to use `Instance`
instead of String and cleanup
-public class TokenRangeMapping<Instance extends CassandraInstance> implements
Serializable
+public class TokenRangeMapping<I extends CassandraInstance> implements
Serializable
{
private static final long serialVersionUID = -7284933683815811160L;
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TokenRangeMapping.class);
+
private final Partitioner partitioner;
private final ReplicationFactor replicationFactor;
- private final transient Set<RingInstance> replacementInstances;
- private final transient RangeMap<BigInteger, List<Instance>>
replicasByTokenRange;
- private final transient Multimap<Instance, Range<BigInteger>>
tokenRangeMap;
- private final transient Map<String, Set<String>> writeReplicasByDC;
- private final transient Map<String, Set<String>> pendingReplicasByDC;
- private final transient List<ReplicaMetadata> replicaMetadata;
+ private final transient Set<I> allInstances;
+ private final transient RangeMap<BigInteger, List<I>> replicasByTokenRange;
+ private final transient Multimap<I, Range<BigInteger>> tokenRangeMap;
+ private final transient Map<String, Set<I>> writeReplicasByDC;
+ private final transient Map<String, Set<I>> pendingReplicasByDC;
Review Comment:
I don't think it's worth keeping the replicas by DC, we always end up
flattening. Why not just keep the set instead?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]