[ 
https://issues.apache.org/jira/browse/BEAM-3485?focusedWorklogId=92498&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92498
 ]

ASF GitHub Bot logged work on BEAM-3485:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 19/Apr/18 09:37
            Start Date: 19/Apr/18 09:37
    Worklog Time Spent: 10m 
      Work Description: adejanovski commented on a change in pull request 
#5124: [BEAM-3485] Fix split generation for Cassandra clusters
URL: https://github.com/apache/beam/pull/5124#discussion_r182688893
 
 

 ##########
 File path: 
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImpl.java
 ##########
 @@ -195,34 +200,30 @@ protected static long 
getEstimatedSizeBytes(List<TokenRange> tokenRanges) {
       numSplits = 1;
     }
 
-    LOG.info("Number of splits is {}", numSplits);
+    if (null != spec.minNumberOfSplits()) {
+      numSplits = Math.max(numSplits, spec.minNumberOfSplits());
+    }
+    LOG.info("Number of desired splits is {}", numSplits);
 
-    double startRange = MIN_TOKEN;
-    double endRange = MAX_TOKEN;
-    double startToken, endToken;
+    SplitGenerator splitGenerator = new 
SplitGenerator(cluster.getMetadata().getPartitioner());
+    List<BigInteger> tokens = cluster.getMetadata().getTokenRanges().stream()
+        .map(tokenRange -> new 
BigInteger(tokenRange.getEnd().getValue().toString()))
+        .collect(Collectors.toList());
+    List<RingRange> splits = splitGenerator.generateSplits(numSplits, tokens);
 
-    endToken = startRange;
-    double incrementValue = endRange - startRange / numSplits;
-    String splitQuery;
-    if (numSplits == 1) {
-      // we have an unique split
-      splitQuery = QueryBuilder.select().from(spec.keyspace(), 
spec.table()).toString();
-      sourceList.add(new CassandraIO.CassandraSource<>(spec, splitQuery));
-    } else {
-      // we have more than one split
-      for (int i = 0; i < numSplits; i++) {
-        startToken = endToken;
-        endToken = startToken + incrementValue;
-        Select.Where builder = QueryBuilder.select().from(spec.keyspace(), 
spec.table()).where();
-        if (i > 0) {
-          builder = builder.and(QueryBuilder.gte("token($pk)", startToken));
-        }
-        if (i < (numSplits - 1)) {
-          builder = builder.and(QueryBuilder.lt("token($pk)", endToken));
-        }
-        sourceList.add(new CassandraIO.CassandraSource(spec, 
builder.toString()));
-      }
+    LOG.info("{} splits were actually generated", splits.size());
+
+    for (RingRange split:splits) {
+      Select.Where builder = QueryBuilder.select().from(spec.keyspace(), 
spec.table()).where();
+        builder = builder.and(QueryBuilder.gte("token(" + partitionKey + ")", 
split.getStart()));
 
 Review comment:
   It's odd, shouldn't checkstyle catch this ?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 92498)
    Time Spent: 2.5h  (was: 2h 20m)

> CassandraIO.read() splitting produces invalid queries
> -----------------------------------------------------
>
>                 Key: BEAM-3485
>                 URL: https://issues.apache.org/jira/browse/BEAM-3485
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-cassandra
>            Reporter: Eugene Kirpichov
>            Assignee: Alexander Dejanovski
>            Priority: Major
>          Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> See 
> [https://stackoverflow.com/questions/48090668/how-to-increase-dataflow-read-parallelism-from-cassandra/48131264?noredirect=1#comment83548442_48131264]
> As the question author points out, the error is likely that token($pk) should 
> be token(pk). This was likely masked by BEAM-3424 and BEAM-3425, and the 
> splitting code path effectively was never invoked, and was broken from the 
> first PR - so there are likely other bugs.
> When testing this issue, we must ensure good code coverage in an IT against a 
> real Cassandra instance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to