jon-wei commented on a change in pull request #8644: Fix Kinesis resharding 
issues
URL: https://github.com/apache/incubator-druid/pull/8644#discussion_r332853562
 
 

 ##########
 File path: 
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
 ##########
 @@ -212,14 +225,47 @@ protected void 
scheduleReporting(ScheduledExecutorService reportingExec)
     // not yet implemented, see issue #6739
   }
 
+
+  /**
+   * We try to parse the shard number of the shard ID, using a BigInteger 
because the Kinesis shard ID can be
+   * up to 128 characters. The shard number is used preferentially because it 
provides a fixed and easily predictable
+   * mapping from shard to task group number.
+   *
+   * If we can't parse the shard number from the ID, then we fall back to 
hashing the shard ID string.
+   */
   @Override
   protected int getTaskGroupIdForPartition(String partitionId)
   {
     if (!partitionIds.contains(partitionId)) {
       partitionIds.add(partitionId);
     }
 
-    return partitionIds.indexOf(partitionId) % 
spec.getIoConfig().getTaskCount();
+    BigInteger numberFromShardId = extractShardNumberFromShardId(partitionId);
+    if (numberFromShardId != null) {
+      BigInteger taskCountAsBigInt = 
BigInteger.valueOf(spec.getIoConfig().getTaskCount());
+      return Math.abs(numberFromShardId.mod(taskCountAsBigInt).intValue());
+    } else {
+      return Math.abs(getHashIntFromShardId(partitionId) % 
spec.getIoConfig().getTaskCount());
+    }
+  }
+
+  private BigInteger extractShardNumberFromShardId(String shardId)
+  {
+    String numOnly = StringUtils.replace(shardId, "shardId-", "");
+    try {
+      // Kinesis shard ID length can be up to 128 characters
+      // 
https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ListShards.html
+      return new BigInteger(numOnly);
+    }
+    catch (NumberFormatException e) {
+      return null;
 
 Review comment:
   In this particular case, I think returning null is fine, but it should have 
had a `@Nullable` annotation.
   
   I'll be removing this method though, only supporting the hash path.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to