yifan-c commented on code in PR #34:
URL: 
https://github.com/apache/cassandra-analytics/pull/34#discussion_r1465745215


##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java:
##########
@@ -205,28 +220,12 @@ private Set<RingInstance> 
instancesFromMapping(Map<Range<BigInteger>, List<RingI
      */
     private StreamSession maybeCreateStreamSession(TaskContext taskContext,
                                                    StreamSession streamSession,
-                                                   Tuple2<DecoratedKey, 
Object[]> rowData,
-                                                   Set<Range<BigInteger>> 
subRanges,
+                                                   Range<BigInteger> 
currentRange,
                                                    
ReplicaAwareFailureHandler<RingInstance> failureHandler,
                                                    List<StreamResult> results)
     throws IOException, ExecutionException, InterruptedException
     {
-        BigInteger token = rowData._1().getToken();
-        Range<BigInteger> tokenRange = getTokenRange(taskContext);
-
-        Preconditions.checkState(tokenRange.contains(token),
-                                 String.format("Received Token %s outside of 
expected range %s", token, tokenRange));
-
-        // We have split ranges likely resulting from pending nodes
-        // Evaluate creating a new session if the token from current row is 
part of a sub-range
-        if (subRanges.size() > 1)
-        {
-            // Create session using sub-range that contains the token from 
current row
-            Optional<Range<BigInteger>> matchingSubRangeOpt = 
subRanges.stream().filter(r -> r.contains(token)).findFirst();
-            Preconditions.checkState(matchingSubRangeOpt.isPresent(),
-                                     String.format("Received Token %s outside 
of expected sub-ranges %s", token, subRanges));
-            streamSession = maybeCreateSubRangeSession(taskContext, 
streamSession, failureHandler, results, matchingSubRangeOpt.get());

Review Comment:
   avoid iterating through the subrange and find the matching range. We know 
where the current range is, since it now preserve the order in the list. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to