iemejia commented on a change in pull request #10546: [BEAM-9008] Add
CassandraIO readAll method
URL: https://github.com/apache/beam/pull/10546#discussion_r405865873
##########
File path:
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -318,15 +354,84 @@ private CassandraIO() {}
return builder().setMapperFactoryFn(mapperFactory).build();
}
+ public Read<T> withRingRange(RingRange ringRange) {
+ return withRingRange(ValueProvider.StaticValueProvider.of(ringRange));
+ }
+
+ public Read<T> withRingRange(ValueProvider<RingRange> ringRange) {
+ return builder().setRingRange(ringRange).build();
+ }
+
@Override
public PCollection<T> expand(PBegin input) {
checkArgument((hosts() != null && port() != null), "WithHosts() and
withPort() are required");
checkArgument(keyspace() != null, "withKeyspace() is required");
checkArgument(table() != null, "withTable() is required");
checkArgument(entity() != null, "withEntity() is required");
checkArgument(coder() != null, "withCoder() is required");
+ try (Cluster cluster =
+ getCluster(hosts(), port(), username(), password(), localDc(),
consistencyLevel())) {
+ Integer splitCount = cluster.getMetadata().getAllHosts().size();
+ if (minNumberOfSplits() != null && minNumberOfSplits().get() != null) {
+ splitCount = minNumberOfSplits().get();
+ }
+ ReadAll<T> readAll =
+ CassandraIO.<T>readAll()
+ .withCoder(this.coder())
+ .withConsistencyLevel(this.consistencyLevel())
+ .withEntity(this.entity())
+ .withHosts(this.hosts())
+ .withKeyspace(this.keyspace())
+ .withLocalDc(this.localDc())
+ .withPort(this.port())
+ .withPassword(this.password())
+ .withQuery(this.query())
+ .withTable(this.table())
+ .withUsername(this.username())
+ .withSplitCount(splitCount)
+ .withMapperFactoryFn(this.mapperFactoryFn());
- return input.apply(org.apache.beam.sdk.io.Read.from(new
CassandraSource<>(this, null)));
+ if (isMurmur3Partitioner(cluster)) {
+ LOG.info("Murmur3Partitioner detected, splitting");
+
+ List<BigInteger> tokens =
+ cluster.getMetadata().getTokenRanges().stream()
+ .map(tokenRange -> new
BigInteger(tokenRange.getEnd().getValue().toString()))
+ .collect(Collectors.toList());
+
+ SplitGenerator splitGenerator =
+ new SplitGenerator(cluster.getMetadata().getPartitioner());
+
+ List<Read<T>> splits =
+ splitGenerator.generateSplits(splitCount, tokens).stream()
+ .map(rr -> CassandraIO.<T>read().withRingRange(rr))
+ .collect(Collectors.toList());
+
+ return input.apply("Creating splits",
Create.of(splits)).apply("readAll", readAll);
+
+ } else {
+ LOG.warn(
+ "Only Murmur3Partitioner is supported for splitting, using an
unique source for "
+ + "the read");
+ String partitioner = cluster.getMetadata().getPartitioner();
+ RingRange totalRingRange =
+ new RingRange(
+ SplitGenerator.getRangeMin(partitioner),
SplitGenerator.getRangeMax(partitioner));
+ return input
+
.apply(Create.of(CassandraIO.<T>read().withRingRange(totalRingRange)))
+ .apply(readAll)
+ .setCoder(coder());
+ }
+ }
+ }
+
+ private static class SplitFn<T> extends DoFn<List<RingRange>, Read<T>> {
Review comment:
`private static class SplitFn<T> extends DoFn<Read<T>, Read<T>>`
Notice that in the second Read you should setup the `RingRange` so each
individual Read can then be Read by a DoFn that knows how to read `Read`
specifications with a RingRange.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services