gianm closed pull request #6291: Using Entry directly instead of Map.Entry in
KafkaSupervisor
URL: https://github.com/apache/incubator-druid/pull/6291
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index 78652a70b91..8d3380aff68 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -793,7 +793,7 @@ void resetInternal(DataSourceMetadata dataSourceMetadata)
// defend against consecutive reset requests from replicas
// as well as the case where the metadata store do not have an entry
for the reset partitions
boolean doReset = false;
- for (Map.Entry<Integer, Long> resetPartitionOffset :
resetKafkaMetadata.getKafkaPartitions()
+ for (Entry<Integer, Long> resetPartitionOffset :
resetKafkaMetadata.getKafkaPartitions()
.getPartitionOffsetMap()
.entrySet()) {
final Long partitionOffsetInMetadataStore = currentMetadata == null
@@ -876,7 +876,7 @@ void gracefulShutdownInternal() throws ExecutionException,
InterruptedException,
// checkTaskDuration() will be triggered. This is better than just telling
these tasks to publish whatever they
// have, as replicas that are supposed to publish the same segment may not
have read the same set of offsets.
for (TaskGroup taskGroup : taskGroups.values()) {
- for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
+ for (Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
if
(taskInfoProvider.getTaskLocation(entry.getKey()).equals(TaskLocation.unknown()))
{
killTask(entry.getKey());
} else {
@@ -915,7 +915,7 @@ String generateSequenceName(
{
StringBuilder sb = new StringBuilder();
- for (Map.Entry<Integer, Long> entry : startPartitions.entrySet()) {
+ for (Entry<Integer, Long> entry : startPartitions.entrySet()) {
sb.append(StringUtils.format("+%d(%d)", entry.getKey(),
entry.getValue()));
}
String partitionOffsetStr = sb.toString().substring(1);
@@ -1083,7 +1083,7 @@ public Boolean apply(KafkaIndexTask.Status status)
// existing) so that the next tasks will start
reading from where this task left off
Map<Integer, Long> publishingTaskEndOffsets =
taskClient.getEndOffsets(taskId);
- for (Map.Entry<Integer, Long> entry :
publishingTaskEndOffsets.entrySet()) {
+ for (Entry<Integer, Long> entry :
publishingTaskEndOffsets.entrySet()) {
Integer partition = entry.getKey();
Long offset = entry.getValue();
ConcurrentHashMap<Integer, Long> partitionOffsets
= partitionGroups.get(
@@ -1375,7 +1375,7 @@ private void updateTaskStatus() throws
ExecutionException, InterruptedException,
// update status (and startTime if unknown) of current tasks in taskGroups
for (TaskGroup group : taskGroups.values()) {
- for (Map.Entry<String, TaskData> entry : group.tasks.entrySet()) {
+ for (Entry<String, TaskData> entry : group.tasks.entrySet()) {
final String taskId = entry.getKey();
final TaskData taskData = entry.getValue();
@@ -1418,7 +1418,7 @@ public Boolean apply(@Nullable DateTime startTime)
// update status of pending completion tasks in pendingCompletionTaskGroups
for (List<TaskGroup> taskGroups : pendingCompletionTaskGroups.values()) {
for (TaskGroup group : taskGroups) {
- for (Map.Entry<String, TaskData> entry : group.tasks.entrySet()) {
+ for (Entry<String, TaskData> entry : group.tasks.entrySet()) {
entry.getValue().status =
taskStorage.getStatus(entry.getKey()).get();
}
}
@@ -1441,7 +1441,7 @@ private void checkTaskDuration() throws
InterruptedException, ExecutionException
final List<ListenableFuture<Map<Integer, Long>>> futures =
Lists.newArrayList();
final List<Integer> futureGroupIds = Lists.newArrayList();
- for (Map.Entry<Integer, TaskGroup> entry : taskGroups.entrySet()) {
+ for (Entry<Integer, TaskGroup> entry : taskGroups.entrySet()) {
Integer groupId = entry.getKey();
TaskGroup group = entry.getValue();
@@ -1474,7 +1474,7 @@ private void checkTaskDuration() throws
InterruptedException, ExecutionException
pendingCompletionTaskGroups.computeIfAbsent(groupId, k -> new
CopyOnWriteArrayList<>()).add(group);
// set endOffsets as the next startOffsets
- for (Map.Entry<Integer, Long> entry : endOffsets.entrySet()) {
+ for (Entry<Integer, Long> entry : endOffsets.entrySet()) {
partitionGroups.get(groupId).put(entry.getKey(), entry.getValue());
}
} else {
@@ -1500,9 +1500,9 @@ private void checkTaskDuration() throws
InterruptedException, ExecutionException
{
if (finalize) {
// 1) Check if any task completed (in which case we're done) and kill
unassigned tasks
- Iterator<Map.Entry<String, TaskData>> i =
taskGroup.tasks.entrySet().iterator();
+ Iterator<Entry<String, TaskData>> i =
taskGroup.tasks.entrySet().iterator();
while (i.hasNext()) {
- Map.Entry<String, TaskData> taskEntry = i.next();
+ Entry<String, TaskData> taskEntry = i.next();
String taskId = taskEntry.getKey();
TaskData task = taskEntry.getValue();
@@ -1564,7 +1564,7 @@ private void checkTaskDuration() throws
InterruptedException, ExecutionException
taskGroup.tasks.remove(taskId);
} else { // otherwise build a map of the highest offsets seen
- for (Map.Entry<Integer, Long> offset : result.entrySet()) {
+ for (Entry<Integer, Long> offset : result.entrySet()) {
if (!endOffsets.containsKey(offset.getKey())
||
endOffsets.get(offset.getKey()).compareTo(offset.getValue()) < 0) {
endOffsets.put(offset.getKey(), offset.getValue());
@@ -1642,7 +1642,7 @@ private void checkPendingCompletionTasks() throws
ExecutionException, Interrupte
{
List<ListenableFuture<?>> futures = Lists.newArrayList();
- for (Map.Entry<Integer, CopyOnWriteArrayList<TaskGroup>> pendingGroupList
: pendingCompletionTaskGroups.entrySet()) {
+ for (Entry<Integer, CopyOnWriteArrayList<TaskGroup>> pendingGroupList :
pendingCompletionTaskGroups.entrySet()) {
boolean stopTasksInTaskGroup = false;
Integer groupId = pendingGroupList.getKey();
@@ -1723,9 +1723,9 @@ private void checkPendingCompletionTasks() throws
ExecutionException, Interrupte
private void checkCurrentTaskState() throws ExecutionException,
InterruptedException, TimeoutException
{
List<ListenableFuture<?>> futures = Lists.newArrayList();
- Iterator<Map.Entry<Integer, TaskGroup>> iTaskGroups =
taskGroups.entrySet().iterator();
+ Iterator<Entry<Integer, TaskGroup>> iTaskGroups =
taskGroups.entrySet().iterator();
while (iTaskGroups.hasNext()) {
- Map.Entry<Integer, TaskGroup> taskGroupEntry = iTaskGroups.next();
+ Entry<Integer, TaskGroup> taskGroupEntry = iTaskGroups.next();
Integer groupId = taskGroupEntry.getKey();
TaskGroup taskGroup = taskGroupEntry.getValue();
@@ -1737,9 +1737,9 @@ private void checkCurrentTaskState() throws
ExecutionException, InterruptedExcep
log.debug("Task group [%d] pre-pruning: %s", groupId,
taskGroup.taskIds());
- Iterator<Map.Entry<String, TaskData>> iTasks =
taskGroup.tasks.entrySet().iterator();
+ Iterator<Entry<String, TaskData>> iTasks =
taskGroup.tasks.entrySet().iterator();
while (iTasks.hasNext()) {
- Map.Entry<String, TaskData> task = iTasks.next();
+ Entry<String, TaskData> task = iTasks.next();
String taskId = task.getKey();
TaskData taskData = task.getValue();
@@ -1810,7 +1810,7 @@ void createNewTasks() throws JsonProcessingException
// iterate through all the current task groups and make sure each one has
the desired number of replica tasks
boolean createdTask = false;
- for (Map.Entry<Integer, TaskGroup> entry : taskGroups.entrySet()) {
+ for (Entry<Integer, TaskGroup> entry : taskGroups.entrySet()) {
TaskGroup taskGroup = entry.getValue();
Integer groupId = entry.getKey();
@@ -1898,7 +1898,7 @@ private void createKafkaTasksForGroup(int groupId, int
replicas) throws JsonProc
private ImmutableMap<Integer, Long>
generateStartingOffsetsForPartitionGroup(int groupId)
{
ImmutableMap.Builder<Integer, Long> builder = ImmutableMap.builder();
- for (Map.Entry<Integer, Long> entry :
partitionGroups.get(groupId).entrySet()) {
+ for (Entry<Integer, Long> entry : partitionGroups.get(groupId).entrySet())
{
Integer partition = entry.getKey();
Long offset = entry.getValue();
@@ -2023,7 +2023,7 @@ private boolean isTaskCurrent(int taskGroupId, String
taskId)
}
final List<ListenableFuture<Void>> futures = Lists.newArrayList();
- for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
+ for (Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
final String taskId = entry.getKey();
final TaskData taskData = entry.getValue();
if (taskData.status == null) {
@@ -2108,7 +2108,7 @@ private boolean isTaskInPendingCompletionGroups(String
taskId)
try {
for (TaskGroup taskGroup : taskGroups.values()) {
- for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
+ for (Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
String taskId = entry.getKey();
@Nullable DateTime startTime = entry.getValue().startTime;
Map<Integer, Long> currentOffsets = entry.getValue().currentOffsets;
@@ -2135,7 +2135,7 @@ private boolean isTaskInPendingCompletionGroups(String
taskId)
for (List<TaskGroup> taskGroups : pendingCompletionTaskGroups.values()) {
for (TaskGroup taskGroup : taskGroups) {
- for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet())
{
+ for (Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
String taskId = entry.getKey();
@Nullable DateTime startTime = entry.getValue().startTime;
Map<Integer, Long> currentOffsets =
entry.getValue().currentOffsets;
@@ -2203,7 +2203,7 @@ private void updateLatestOffsetsFromKafka()
.stream()
.flatMap(taskGroup -> taskGroup.tasks.entrySet().stream())
.flatMap(taskData ->
taskData.getValue().currentOffsets.entrySet().stream())
- .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue,
Long::max));
+ .collect(Collectors.toMap(Entry::getKey, Entry::getValue, Long::max));
}
private Map<Integer, Long> getLagPerPartition(Map<Integer, Long>
currentOffsets)
@@ -2213,7 +2213,7 @@ private void updateLatestOffsetsFromKafka()
.stream()
.collect(
Collectors.toMap(
- Map.Entry::getKey,
+ Entry::getKey,
e -> latestOffsetsFromKafka != null
&& latestOffsetsFromKafka.get(e.getKey()) != null
&& e.getValue() != null
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]