vmarquez commented on a change in pull request #10546: [BEAM-9008] Add 
CassandraIO readAll method
URL: https://github.com/apache/beam/pull/10546#discussion_r405875425
 
 

 ##########
 File path: 
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
 ##########
 @@ -318,15 +354,84 @@ private CassandraIO() {}
       return builder().setMapperFactoryFn(mapperFactory).build();
     }
 
+    public Read<T> withRingRange(RingRange ringRange) {
+      return withRingRange(ValueProvider.StaticValueProvider.of(ringRange));
+    }
+
+    public Read<T> withRingRange(ValueProvider<RingRange> ringRange) {
+      return builder().setRingRange(ringRange).build();
+    }
+
     @Override
     public PCollection<T> expand(PBegin input) {
       checkArgument((hosts() != null && port() != null), "WithHosts() and 
withPort() are required");
       checkArgument(keyspace() != null, "withKeyspace() is required");
       checkArgument(table() != null, "withTable() is required");
       checkArgument(entity() != null, "withEntity() is required");
       checkArgument(coder() != null, "withCoder() is required");
+      try (Cluster cluster =
+          getCluster(hosts(), port(), username(), password(), localDc(), 
consistencyLevel())) {
+        Integer splitCount = cluster.getMetadata().getAllHosts().size();
+        if (minNumberOfSplits() != null && minNumberOfSplits().get() != null) {
+          splitCount = minNumberOfSplits().get();
+        }
+        ReadAll<T> readAll =
+            CassandraIO.<T>readAll()
+                .withCoder(this.coder())
+                .withConsistencyLevel(this.consistencyLevel())
+                .withEntity(this.entity())
+                .withHosts(this.hosts())
+                .withKeyspace(this.keyspace())
+                .withLocalDc(this.localDc())
+                .withPort(this.port())
+                .withPassword(this.password())
+                .withQuery(this.query())
+                .withTable(this.table())
+                .withUsername(this.username())
+                .withSplitCount(splitCount)
+                .withMapperFactoryFn(this.mapperFactoryFn());
 
-      return input.apply(org.apache.beam.sdk.io.Read.from(new 
CassandraSource<>(this, null)));
+        if (isMurmur3Partitioner(cluster)) {
+          LOG.info("Murmur3Partitioner detected, splitting");
+
+          List<BigInteger> tokens =
+              cluster.getMetadata().getTokenRanges().stream()
+                  .map(tokenRange -> new 
BigInteger(tokenRange.getEnd().getValue().toString()))
+                  .collect(Collectors.toList());
+
+          SplitGenerator splitGenerator =
+              new SplitGenerator(cluster.getMetadata().getPartitioner());
+
+          List<Read<T>> splits =
 
 Review comment:
   This is more or less leaving the splitting logic the same as it was before, 
is there a reason we need to change it now? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to