jihoonson commented on a change in pull request #5996: Fix NPE while handling
CheckpointNotice in KafkaSupervisor
URL: https://github.com/apache/incubator-druid/pull/5996#discussion_r202414735
##########
File path:
extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
##########
@@ -1087,23 +1088,18 @@ public Boolean apply(KafkaIndexTask.Status status)
}
return false;
} else {
- final TaskGroup taskGroup = new TaskGroup(
- ImmutableMap.copyOf(
- kafkaTask.getIOConfig()
- .getStartPartitions()
- .getPartitionOffsetMap()
- ),
kafkaTask.getIOConfig().getMinimumMessageTime(),
- kafkaTask.getIOConfig().getMaximumMessageTime()
- );
- if (taskGroups.putIfAbsent(
+ final TaskGroup taskGroup =
taskGroups.computeIfAbsent(
taskGroupId,
- taskGroup
- ) == null) {
-
sequenceTaskGroup.put(generateSequenceName(taskGroup),
taskGroups.get(taskGroupId));
- log.info("Created new task group [%d]",
taskGroupId);
Review comment:
Sounds good. I realized this log can be useful to debug supervisor's
behavior for a specific taskGroupId.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]