gianm commented on a change in pull request #8644: Fix Kinesis resharding issues
URL: https://github.com/apache/incubator-druid/pull/8644#discussion_r332837183
##########
File path:
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
##########
@@ -212,14 +225,47 @@ protected void
scheduleReporting(ScheduledExecutorService reportingExec)
// not yet implemented, see issue #6739
}
+
+ /**
+ * We try to parse the shard number of the shard ID, using a BigInteger
because the Kinesis shard ID can be
+ * up to 128 characters. The shard number is used preferentially because it
provides a fixed and easily predictable
+ * mapping from shard to task group number.
+ *
+ * If we can't parse the shard number from the ID, then we fall back to
hashing the shard ID string.
+ */
@Override
protected int getTaskGroupIdForPartition(String partitionId)
{
if (!partitionIds.contains(partitionId)) {
partitionIds.add(partitionId);
}
- return partitionIds.indexOf(partitionId) %
spec.getIoConfig().getTaskCount();
+ BigInteger numberFromShardId = extractShardNumberFromShardId(partitionId);
+ if (numberFromShardId != null) {
+ BigInteger taskCountAsBigInt =
BigInteger.valueOf(spec.getIoConfig().getTaskCount());
+ return Math.abs(numberFromShardId.mod(taskCountAsBigInt).intValue());
+ } else {
+ return Math.abs(getHashIntFromShardId(partitionId) %
spec.getIoConfig().getTaskCount());
+ }
+ }
+
+ private BigInteger extractShardNumberFromShardId(String shardId)
+ {
+ String numOnly = StringUtils.replace(shardId, "shardId-", "");
+ try {
+ // Kinesis shard ID length can be up to 128 characters
+ //
https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ListShards.html
+ return new BigInteger(numOnly);
+ }
+ catch (NumberFormatException e) {
+ return null;
Review comment:
We had a discussion about this a while back, and went in favor of preferring
nulls (with `@Nullable` annotations, which this method should have). I went
looking for a link to the discussion but unfortunately can't find it right now,
nor can I recall who came forward with what reasons. This PR isn't the right
time to revisit that discussion but it could be a good one for the dev mailing
list.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]