This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 00ea8c0  using Entry directly instead of Map.Entry in KafkaSupervisor 
(#6291)
00ea8c0 is described below

commit 00ea8c00acf61a9c189ac01ed20febfac9974925
Author: QiuMM <[email protected]>
AuthorDate: Thu Sep 27 10:01:36 2018 +0800

    using Entry directly instead of Map.Entry in KafkaSupervisor (#6291)
---
 .../indexing/kafka/supervisor/KafkaSupervisor.java | 46 +++++++++++-----------
 1 file changed, 23 insertions(+), 23 deletions(-)

diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index 07e98f3..67900f6 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -783,7 +783,7 @@ public class KafkaSupervisor implements Supervisor
         // defend against consecutive reset requests from replicas
         // as well as the case where the metadata store do not have an entry 
for the reset partitions
         boolean doReset = false;
-        for (Map.Entry<Integer, Long> resetPartitionOffset : 
resetKafkaMetadata.getKafkaPartitions()
+        for (Entry<Integer, Long> resetPartitionOffset : 
resetKafkaMetadata.getKafkaPartitions()
                                                                                
.getPartitionOffsetMap()
                                                                                
.entrySet()) {
           final Long partitionOffsetInMetadataStore = currentMetadata == null
@@ -866,7 +866,7 @@ public class KafkaSupervisor implements Supervisor
     // checkTaskDuration() will be triggered. This is better than just telling 
these tasks to publish whatever they
     // have, as replicas that are supposed to publish the same segment may not 
have read the same set of offsets.
     for (TaskGroup taskGroup : taskGroups.values()) {
-      for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
+      for (Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
         if 
(taskInfoProvider.getTaskLocation(entry.getKey()).equals(TaskLocation.unknown()))
 {
           killTask(entry.getKey());
         } else {
@@ -914,7 +914,7 @@ public class KafkaSupervisor implements Supervisor
   {
     StringBuilder sb = new StringBuilder();
 
-    for (Map.Entry<Integer, Long> entry : startPartitions.entrySet()) {
+    for (Entry<Integer, Long> entry : startPartitions.entrySet()) {
       sb.append(StringUtils.format("+%d(%d)", entry.getKey(), 
entry.getValue()));
     }
     String partitionOffsetStr = sb.toString().substring(1);
@@ -1073,7 +1073,7 @@ public class KafkaSupervisor implements Supervisor
                           // existing) so that the next tasks will start 
reading from where this task left off
                           Map<Integer, Long> publishingTaskEndOffsets = 
taskClient.getEndOffsets(taskId);
 
-                          for (Map.Entry<Integer, Long> entry : 
publishingTaskEndOffsets.entrySet()) {
+                          for (Entry<Integer, Long> entry : 
publishingTaskEndOffsets.entrySet()) {
                             Integer partition = entry.getKey();
                             Long offset = entry.getValue();
                             ConcurrentHashMap<Integer, Long> partitionOffsets 
= partitionGroups.get(
@@ -1380,7 +1380,7 @@ public class KafkaSupervisor implements Supervisor
 
     // update status (and startTime if unknown) of current tasks in taskGroups
     for (TaskGroup group : taskGroups.values()) {
-      for (Map.Entry<String, TaskData> entry : group.tasks.entrySet()) {
+      for (Entry<String, TaskData> entry : group.tasks.entrySet()) {
         final String taskId = entry.getKey();
         final TaskData taskData = entry.getValue();
 
@@ -1423,7 +1423,7 @@ public class KafkaSupervisor implements Supervisor
     // update status of pending completion tasks in pendingCompletionTaskGroups
     for (List<TaskGroup> taskGroups : pendingCompletionTaskGroups.values()) {
       for (TaskGroup group : taskGroups) {
-        for (Map.Entry<String, TaskData> entry : group.tasks.entrySet()) {
+        for (Entry<String, TaskData> entry : group.tasks.entrySet()) {
           entry.getValue().status = 
taskStorage.getStatus(entry.getKey()).get();
         }
       }
@@ -1446,7 +1446,7 @@ public class KafkaSupervisor implements Supervisor
     final List<ListenableFuture<Map<Integer, Long>>> futures = 
Lists.newArrayList();
     final List<Integer> futureGroupIds = Lists.newArrayList();
 
-    for (Map.Entry<Integer, TaskGroup> entry : taskGroups.entrySet()) {
+    for (Entry<Integer, TaskGroup> entry : taskGroups.entrySet()) {
       Integer groupId = entry.getKey();
       TaskGroup group = entry.getValue();
 
@@ -1479,7 +1479,7 @@ public class KafkaSupervisor implements Supervisor
         pendingCompletionTaskGroups.computeIfAbsent(groupId, k -> new 
CopyOnWriteArrayList<>()).add(group);
 
         // set endOffsets as the next startOffsets
-        for (Map.Entry<Integer, Long> entry : endOffsets.entrySet()) {
+        for (Entry<Integer, Long> entry : endOffsets.entrySet()) {
           partitionGroups.get(groupId).put(entry.getKey(), entry.getValue());
         }
       } else {
@@ -1505,9 +1505,9 @@ public class KafkaSupervisor implements Supervisor
   {
     if (finalize) {
       // 1) Check if any task completed (in which case we're done) and kill 
unassigned tasks
-      Iterator<Map.Entry<String, TaskData>> i = 
taskGroup.tasks.entrySet().iterator();
+      Iterator<Entry<String, TaskData>> i = 
taskGroup.tasks.entrySet().iterator();
       while (i.hasNext()) {
-        Map.Entry<String, TaskData> taskEntry = i.next();
+        Entry<String, TaskData> taskEntry = i.next();
         String taskId = taskEntry.getKey();
         TaskData task = taskEntry.getValue();
 
@@ -1569,7 +1569,7 @@ public class KafkaSupervisor implements Supervisor
                 taskGroup.tasks.remove(taskId);
 
               } else { // otherwise build a map of the highest offsets seen
-                for (Map.Entry<Integer, Long> offset : result.entrySet()) {
+                for (Entry<Integer, Long> offset : result.entrySet()) {
                   if (!endOffsets.containsKey(offset.getKey())
                       || 
endOffsets.get(offset.getKey()).compareTo(offset.getValue()) < 0) {
                     endOffsets.put(offset.getKey(), offset.getValue());
@@ -1647,7 +1647,7 @@ public class KafkaSupervisor implements Supervisor
   {
     List<ListenableFuture<?>> futures = Lists.newArrayList();
 
-    for (Map.Entry<Integer, CopyOnWriteArrayList<TaskGroup>> pendingGroupList 
: pendingCompletionTaskGroups.entrySet()) {
+    for (Entry<Integer, CopyOnWriteArrayList<TaskGroup>> pendingGroupList : 
pendingCompletionTaskGroups.entrySet()) {
 
       boolean stopTasksInTaskGroup = false;
       Integer groupId = pendingGroupList.getKey();
@@ -1728,9 +1728,9 @@ public class KafkaSupervisor implements Supervisor
   private void checkCurrentTaskState() throws ExecutionException, 
InterruptedException, TimeoutException
   {
     List<ListenableFuture<?>> futures = Lists.newArrayList();
-    Iterator<Map.Entry<Integer, TaskGroup>> iTaskGroups = 
taskGroups.entrySet().iterator();
+    Iterator<Entry<Integer, TaskGroup>> iTaskGroups = 
taskGroups.entrySet().iterator();
     while (iTaskGroups.hasNext()) {
-      Map.Entry<Integer, TaskGroup> taskGroupEntry = iTaskGroups.next();
+      Entry<Integer, TaskGroup> taskGroupEntry = iTaskGroups.next();
       Integer groupId = taskGroupEntry.getKey();
       TaskGroup taskGroup = taskGroupEntry.getValue();
 
@@ -1742,9 +1742,9 @@ public class KafkaSupervisor implements Supervisor
 
       log.debug("Task group [%d] pre-pruning: %s", groupId, 
taskGroup.taskIds());
 
-      Iterator<Map.Entry<String, TaskData>> iTasks = 
taskGroup.tasks.entrySet().iterator();
+      Iterator<Entry<String, TaskData>> iTasks = 
taskGroup.tasks.entrySet().iterator();
       while (iTasks.hasNext()) {
-        Map.Entry<String, TaskData> task = iTasks.next();
+        Entry<String, TaskData> task = iTasks.next();
         String taskId = task.getKey();
         TaskData taskData = task.getValue();
 
@@ -1817,7 +1817,7 @@ public class KafkaSupervisor implements Supervisor
 
     // iterate through all the current task groups and make sure each one has 
the desired number of replica tasks
     boolean createdTask = false;
-    for (Map.Entry<Integer, TaskGroup> entry : taskGroups.entrySet()) {
+    for (Entry<Integer, TaskGroup> entry : taskGroups.entrySet()) {
       TaskGroup taskGroup = entry.getValue();
       Integer groupId = entry.getKey();
 
@@ -1910,7 +1910,7 @@ public class KafkaSupervisor implements Supervisor
   private ImmutableMap<Integer, Long> 
generateStartingOffsetsForPartitionGroup(int groupId)
   {
     ImmutableMap.Builder<Integer, Long> builder = ImmutableMap.builder();
-    for (Map.Entry<Integer, Long> entry : 
partitionGroups.get(groupId).entrySet()) {
+    for (Entry<Integer, Long> entry : partitionGroups.get(groupId).entrySet()) 
{
       Integer partition = entry.getKey();
       Long offset = entry.getValue();
 
@@ -2035,7 +2035,7 @@ public class KafkaSupervisor implements Supervisor
     }
 
     final List<ListenableFuture<Void>> futures = Lists.newArrayList();
-    for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
+    for (Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
       final String taskId = entry.getKey();
       final TaskData taskData = entry.getValue();
       if (taskData.status == null) {
@@ -2121,7 +2121,7 @@ public class KafkaSupervisor implements Supervisor
 
     try {
       for (TaskGroup taskGroup : taskGroups.values()) {
-        for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
+        for (Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
           String taskId = entry.getKey();
           @Nullable
           DateTime startTime = entry.getValue().startTime;
@@ -2149,7 +2149,7 @@ public class KafkaSupervisor implements Supervisor
 
       for (List<TaskGroup> taskGroups : pendingCompletionTaskGroups.values()) {
         for (TaskGroup taskGroup : taskGroups) {
-          for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) 
{
+          for (Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
             String taskId = entry.getKey();
             @Nullable
             DateTime startTime = entry.getValue().startTime;
@@ -2218,7 +2218,7 @@ public class KafkaSupervisor implements Supervisor
         .stream()
         .flatMap(taskGroup -> taskGroup.tasks.entrySet().stream())
         .flatMap(taskData -> 
taskData.getValue().currentOffsets.entrySet().stream())
-        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, 
Long::max));
+        .collect(Collectors.toMap(Entry::getKey, Entry::getValue, Long::max));
   }
 
   private Map<Integer, Long> getLagPerPartition(Map<Integer, Long> 
currentOffsets)
@@ -2228,7 +2228,7 @@ public class KafkaSupervisor implements Supervisor
         .stream()
         .collect(
             Collectors.toMap(
-                Map.Entry::getKey,
+                Entry::getKey,
                 e -> latestOffsetsFromKafka != null
                      && latestOffsetsFromKafka.get(e.getKey()) != null
                      && e.getValue() != null


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to