arjunashok commented on code in PR #34:
URL: 
https://github.com/apache/cassandra-analytics/pull/34#discussion_r1466867119


##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java:
##########
@@ -139,16 +137,33 @@ public List<StreamResult> 
write(Iterator<Tuple2<DecoratedKey, Object[]>> sourceI
         Map<String, Object> valueMap = new HashMap<>();
         try
         {
-            Set<Range<BigInteger>> newRanges = new 
HashSet<>(initialTokenRangeMapping.getRangeMap().asMapOfRanges().keySet());
+            // Use list to preserve order of ranges
+            List<Range<BigInteger>> newRanges = new 
ArrayList<>(initialTokenRangeMapping.getRangeMap()
+                                                                               
         .asMapOfRanges()
+                                                                               
         .keySet());
             Range<BigInteger> tokenRange = getTokenRange(taskContext);
-            Set<Range<BigInteger>> subRanges = newRanges.contains(tokenRange) ?
-                                               
Collections.singleton(tokenRange) :
-                                               
getIntersectingSubRanges(newRanges, tokenRange);
+            List<Range<BigInteger>> subRanges = newRanges.contains(tokenRange) 
?
+                                                
Collections.singletonList(tokenRange) : // no overlaps
+                                                
getIntersectingSubRanges(newRanges, tokenRange); // has overlaps; split into 
sub-ranges
 
+            int currentRangeIndex = 0;
+            Range<BigInteger> currentRange = subRanges.get(currentRangeIndex);
             while (dataIterator.hasNext())
             {
                 Tuple2<DecoratedKey, Object[]> rowData = dataIterator.next();
-                streamSession = maybeCreateStreamSession(taskContext, 
streamSession, rowData, subRanges, failureHandler, results);
+                // advance to new sub-range if needed

Review Comment:
   As discussed offline, we will have to check all the subranges for the token 
corresponding to the row, as the data being written can happen to be in a 
sub-range that is not among the first two ranges.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to