iemejia commented on a change in pull request #10546: [BEAM-9008] Add 
CassandraIO readAll method
URL: https://github.com/apache/beam/pull/10546#discussion_r405865873
 
 

 ##########
 File path: 
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
 ##########
 @@ -318,15 +354,84 @@ private CassandraIO() {}
       return builder().setMapperFactoryFn(mapperFactory).build();
     }
 
+    public Read<T> withRingRange(RingRange ringRange) {
+      return withRingRange(ValueProvider.StaticValueProvider.of(ringRange));
+    }
+
+    public Read<T> withRingRange(ValueProvider<RingRange> ringRange) {
+      return builder().setRingRange(ringRange).build();
+    }
+
     @Override
     public PCollection<T> expand(PBegin input) {
       checkArgument((hosts() != null && port() != null), "WithHosts() and 
withPort() are required");
       checkArgument(keyspace() != null, "withKeyspace() is required");
       checkArgument(table() != null, "withTable() is required");
       checkArgument(entity() != null, "withEntity() is required");
       checkArgument(coder() != null, "withCoder() is required");
+      try (Cluster cluster =
+          getCluster(hosts(), port(), username(), password(), localDc(), 
consistencyLevel())) {
+        Integer splitCount = cluster.getMetadata().getAllHosts().size();
+        if (minNumberOfSplits() != null && minNumberOfSplits().get() != null) {
+          splitCount = minNumberOfSplits().get();
+        }
+        ReadAll<T> readAll =
+            CassandraIO.<T>readAll()
+                .withCoder(this.coder())
+                .withConsistencyLevel(this.consistencyLevel())
+                .withEntity(this.entity())
+                .withHosts(this.hosts())
+                .withKeyspace(this.keyspace())
+                .withLocalDc(this.localDc())
+                .withPort(this.port())
+                .withPassword(this.password())
+                .withQuery(this.query())
+                .withTable(this.table())
+                .withUsername(this.username())
+                .withSplitCount(splitCount)
+                .withMapperFactoryFn(this.mapperFactoryFn());
 
-      return input.apply(org.apache.beam.sdk.io.Read.from(new 
CassandraSource<>(this, null)));
+        if (isMurmur3Partitioner(cluster)) {
+          LOG.info("Murmur3Partitioner detected, splitting");
+
+          List<BigInteger> tokens =
+              cluster.getMetadata().getTokenRanges().stream()
+                  .map(tokenRange -> new 
BigInteger(tokenRange.getEnd().getValue().toString()))
+                  .collect(Collectors.toList());
+
+          SplitGenerator splitGenerator =
+              new SplitGenerator(cluster.getMetadata().getPartitioner());
+
+          List<Read<T>> splits =
+              splitGenerator.generateSplits(splitCount, tokens).stream()
+                  .map(rr -> CassandraIO.<T>read().withRingRange(rr))
+                  .collect(Collectors.toList());
+
+          return input.apply("Creating splits", 
Create.of(splits)).apply("readAll", readAll);
+
+        } else {
+          LOG.warn(
+              "Only Murmur3Partitioner is supported for splitting, using an 
unique source for "
+                  + "the read");
+          String partitioner = cluster.getMetadata().getPartitioner();
+          RingRange totalRingRange =
+              new RingRange(
+                  SplitGenerator.getRangeMin(partitioner), 
SplitGenerator.getRangeMax(partitioner));
+          return input
+              
.apply(Create.of(CassandraIO.<T>read().withRingRange(totalRingRange)))
+              .apply(readAll)
+              .setCoder(coder());
+        }
+      }
+    }
+
+    private static class SplitFn<T> extends DoFn<List<RingRange>, Read<T>> {
 
 Review comment:
   `private static class SplitFn<T> extends DoFn<Read<T>, Read<T>>`
   Notice that in the second Read you should setup the `RingRange` so each 
individual Read can then be Read by a DoFn that knows how to read `Read` 
specifications with a RingRange.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to