This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new f7753ef  fix KafkaSupervisor stats report error (#6508)
f7753ef is described below

commit f7753ef1e2183722a89efcaf2ea3dd2fec494e9f
Author: Joshua Sun <[email protected]>
AuthorDate: Thu Oct 25 15:45:54 2018 -0700

    fix KafkaSupervisor stats report error (#6508)
    
    * fix kafkasupervisor stats 500
    
    * added unit test
    
    * throw error if group already exists
---
 .../indexing/kafka/supervisor/KafkaSupervisor.java | 68 ++++++++++++++++++----
 .../kafka/supervisor/KafkaSupervisorTest.java      | 43 ++++++++++++++
 2 files changed, 99 insertions(+), 12 deletions(-)

diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index da33d3d..fedc77f 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -2416,21 +2416,21 @@ public class KafkaSupervisor implements Supervisor
     }
 
     for (int groupId : pendingCompletionTaskGroups.keySet()) {
-      TaskGroup group = taskGroups.get(groupId);
-      for (String taskId : group.taskIds()) {
-        futures.add(
-            Futures.transform(
-                taskClient.getMovingAveragesAsync(taskId),
-                (Function<Map<String, Object>, StatsFromTaskResult>) 
(currentStats) -> {
-                  return new StatsFromTaskResult(
+      List<TaskGroup> pendingGroups = pendingCompletionTaskGroups.get(groupId);
+      for (TaskGroup pendingGroup : pendingGroups) {
+        for (String taskId : pendingGroup.taskIds()) {
+          futures.add(
+              Futures.transform(
+                  taskClient.getMovingAveragesAsync(taskId),
+                  (Function<Map<String, Object>, StatsFromTaskResult>) 
(currentStats) -> new StatsFromTaskResult(
                       groupId,
                       taskId,
                       currentStats
-                  );
-                }
-            )
-        );
-        groupAndTaskIds.add(new Pair<>(groupId, taskId));
+                  )
+              )
+          );
+          groupAndTaskIds.add(new Pair<>(groupId, taskId));
+        }
       }
     }
 
@@ -2450,6 +2450,50 @@ public class KafkaSupervisor implements Supervisor
   }
 
   @VisibleForTesting
+  void addTaskGroupToActivelyReadingTaskGroup(
+      int taskGroupId,
+      ImmutableMap<Integer, Long> partitionOffsets,
+      Optional<DateTime> minMsgTime,
+      Optional<DateTime> maxMsgTime,
+      Set<String> tasks
+  )
+  {
+    TaskGroup group = new TaskGroup(
+        taskGroupId,
+        partitionOffsets,
+        minMsgTime,
+        maxMsgTime
+    );
+    group.tasks.putAll(tasks.stream().collect(Collectors.toMap(x -> x, x -> 
new TaskData())));
+    if (taskGroups.putIfAbsent(taskGroupId, group) != null) {
+      throw new ISE(
+          "trying to add taskGroup with ID [%s] to actively reading task 
groups, but group already exists.",
+          taskGroupId
+      );
+    }
+  }
+
+  @VisibleForTesting
+  void addTaskGroupToPendingCompletionTaskGroup(
+      int taskGroupId,
+      ImmutableMap<Integer, Long> partitionOffsets,
+      Optional<DateTime> minMsgTime,
+      Optional<DateTime> maxMsgTime,
+      Set<String> tasks
+  )
+  {
+    TaskGroup group = new TaskGroup(
+        taskGroupId,
+        partitionOffsets,
+        minMsgTime,
+        maxMsgTime
+    );
+    group.tasks.putAll(tasks.stream().collect(Collectors.toMap(x -> x, x -> 
new TaskData())));
+    pendingCompletionTaskGroups.computeIfAbsent(taskGroupId, x -> new 
CopyOnWriteArrayList<>())
+                               .add(group);
+  }
+
+  @VisibleForTesting
   @Nullable
   TaskGroup removeTaskGroup(int taskGroupId)
   {
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 084696d..2379123 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import kafka.admin.AdminUtils;
@@ -2546,6 +2547,48 @@ public class KafkaSupervisorTest extends EasyMockSupport
     Assert.assertEquals(Long.MAX_VALUE, (long) 
taskConfig.getEndPartitions().getPartitionOffsetMap().get(2));
   }
 
+  @Test
+  public void testGetCurrentTotalStats() throws Exception
+  {
+    supervisor = getSupervisor(1, 2, true, "PT1H", null, null, false);
+    supervisor.addTaskGroupToActivelyReadingTaskGroup(
+        supervisor.getTaskGroupIdForPartition(0),
+        ImmutableMap.of(0, 0L),
+        Optional.absent(),
+        Optional.absent(),
+        ImmutableSet.of("task1")
+    );
+
+    supervisor.addTaskGroupToPendingCompletionTaskGroup(
+        supervisor.getTaskGroupIdForPartition(1),
+        ImmutableMap.of(0, 0L),
+        Optional.absent(),
+        Optional.absent(),
+        ImmutableSet.of("task2")
+    );
+
+    
expect(taskClient.getMovingAveragesAsync("task1")).andReturn(Futures.immediateFuture(ImmutableMap.of(
+        "prop1",
+        "val1"
+    ))).times(1);
+
+    
expect(taskClient.getMovingAveragesAsync("task2")).andReturn(Futures.immediateFuture(ImmutableMap.of(
+        "prop2",
+        "val2"
+    ))).times(1);
+
+    replayAll();
+
+    Map<String, Map<String, Object>> stats = supervisor.getStats();
+
+    verifyAll();
+
+    Assert.assertEquals(2, stats.size());
+    Assert.assertEquals(ImmutableSet.of("0", "1"), stats.keySet());
+    Assert.assertEquals(ImmutableMap.of("task1", ImmutableMap.of("prop1", 
"val1")), stats.get("0"));
+    Assert.assertEquals(ImmutableMap.of("task2", ImmutableMap.of("prop2", 
"val2")), stats.get("1"));
+  }
+
 
   private void addSomeEvents(int numEventsPerPartition) throws Exception
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to