vmarquez commented on a change in pull request #10546: [BEAM-9008] Add
CassandraIO readAll method
URL: https://github.com/apache/beam/pull/10546#discussion_r405875425
##########
File path:
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -318,15 +354,84 @@ private CassandraIO() {}
return builder().setMapperFactoryFn(mapperFactory).build();
}
+ public Read<T> withRingRange(RingRange ringRange) {
+ return withRingRange(ValueProvider.StaticValueProvider.of(ringRange));
+ }
+
+ public Read<T> withRingRange(ValueProvider<RingRange> ringRange) {
+ return builder().setRingRange(ringRange).build();
+ }
+
@Override
public PCollection<T> expand(PBegin input) {
checkArgument((hosts() != null && port() != null), "WithHosts() and
withPort() are required");
checkArgument(keyspace() != null, "withKeyspace() is required");
checkArgument(table() != null, "withTable() is required");
checkArgument(entity() != null, "withEntity() is required");
checkArgument(coder() != null, "withCoder() is required");
+ try (Cluster cluster =
+ getCluster(hosts(), port(), username(), password(), localDc(),
consistencyLevel())) {
+ Integer splitCount = cluster.getMetadata().getAllHosts().size();
+ if (minNumberOfSplits() != null && minNumberOfSplits().get() != null) {
+ splitCount = minNumberOfSplits().get();
+ }
+ ReadAll<T> readAll =
+ CassandraIO.<T>readAll()
+ .withCoder(this.coder())
+ .withConsistencyLevel(this.consistencyLevel())
+ .withEntity(this.entity())
+ .withHosts(this.hosts())
+ .withKeyspace(this.keyspace())
+ .withLocalDc(this.localDc())
+ .withPort(this.port())
+ .withPassword(this.password())
+ .withQuery(this.query())
+ .withTable(this.table())
+ .withUsername(this.username())
+ .withSplitCount(splitCount)
+ .withMapperFactoryFn(this.mapperFactoryFn());
- return input.apply(org.apache.beam.sdk.io.Read.from(new
CassandraSource<>(this, null)));
+ if (isMurmur3Partitioner(cluster)) {
+ LOG.info("Murmur3Partitioner detected, splitting");
+
+ List<BigInteger> tokens =
+ cluster.getMetadata().getTokenRanges().stream()
+ .map(tokenRange -> new
BigInteger(tokenRange.getEnd().getValue().toString()))
+ .collect(Collectors.toList());
+
+ SplitGenerator splitGenerator =
+ new SplitGenerator(cluster.getMetadata().getPartitioner());
+
+ List<Read<T>> splits =
Review comment:
Hmm, not sure what you mean, I do Create.of(splits)on line 410, do you want
the Create.of() moved to line 406?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services