jon-wei commented on a change in pull request #8644: Fix Kinesis resharding
issues
URL: https://github.com/apache/incubator-druid/pull/8644#discussion_r332853562
##########
File path:
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
##########
@@ -212,14 +225,47 @@ protected void
scheduleReporting(ScheduledExecutorService reportingExec)
// not yet implemented, see issue #6739
}
+
+ /**
+ * We try to parse the shard number of the shard ID, using a BigInteger
because the Kinesis shard ID can be
+ * up to 128 characters. The shard number is used preferentially because it
provides a fixed and easily predictable
+ * mapping from shard to task group number.
+ *
+ * If we can't parse the shard number from the ID, then we fall back to
hashing the shard ID string.
+ */
@Override
protected int getTaskGroupIdForPartition(String partitionId)
{
if (!partitionIds.contains(partitionId)) {
partitionIds.add(partitionId);
}
- return partitionIds.indexOf(partitionId) %
spec.getIoConfig().getTaskCount();
+ BigInteger numberFromShardId = extractShardNumberFromShardId(partitionId);
+ if (numberFromShardId != null) {
+ BigInteger taskCountAsBigInt =
BigInteger.valueOf(spec.getIoConfig().getTaskCount());
+ return Math.abs(numberFromShardId.mod(taskCountAsBigInt).intValue());
+ } else {
+ return Math.abs(getHashIntFromShardId(partitionId) %
spec.getIoConfig().getTaskCount());
+ }
+ }
+
+ private BigInteger extractShardNumberFromShardId(String shardId)
+ {
+ String numOnly = StringUtils.replace(shardId, "shardId-", "");
+ try {
+ // Kinesis shard ID length can be up to 128 characters
+ //
https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ListShards.html
+ return new BigInteger(numOnly);
+ }
+ catch (NumberFormatException e) {
+ return null;
Review comment:
In this particular case, I think returning null is fine, but it should have
had a `@Nullable` annotation.
I'll be removing this method though, only supporting the hash path.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]