This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new f7753ef fix KafkaSupervisor stats report error (#6508)
f7753ef is described below
commit f7753ef1e2183722a89efcaf2ea3dd2fec494e9f
Author: Joshua Sun <[email protected]>
AuthorDate: Thu Oct 25 15:45:54 2018 -0700
fix KafkaSupervisor stats report error (#6508)
* fix kafkasupervisor stats 500
* added unit test
* throw error if group already exists
---
.../indexing/kafka/supervisor/KafkaSupervisor.java | 68 ++++++++++++++++++----
.../kafka/supervisor/KafkaSupervisorTest.java | 43 ++++++++++++++
2 files changed, 99 insertions(+), 12 deletions(-)
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index da33d3d..fedc77f 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -2416,21 +2416,21 @@ public class KafkaSupervisor implements Supervisor
}
for (int groupId : pendingCompletionTaskGroups.keySet()) {
- TaskGroup group = taskGroups.get(groupId);
- for (String taskId : group.taskIds()) {
- futures.add(
- Futures.transform(
- taskClient.getMovingAveragesAsync(taskId),
- (Function<Map<String, Object>, StatsFromTaskResult>)
(currentStats) -> {
- return new StatsFromTaskResult(
+ List<TaskGroup> pendingGroups = pendingCompletionTaskGroups.get(groupId);
+ for (TaskGroup pendingGroup : pendingGroups) {
+ for (String taskId : pendingGroup.taskIds()) {
+ futures.add(
+ Futures.transform(
+ taskClient.getMovingAveragesAsync(taskId),
+ (Function<Map<String, Object>, StatsFromTaskResult>)
(currentStats) -> new StatsFromTaskResult(
groupId,
taskId,
currentStats
- );
- }
- )
- );
- groupAndTaskIds.add(new Pair<>(groupId, taskId));
+ )
+ )
+ );
+ groupAndTaskIds.add(new Pair<>(groupId, taskId));
+ }
}
}
@@ -2450,6 +2450,50 @@ public class KafkaSupervisor implements Supervisor
}
@VisibleForTesting
+ void addTaskGroupToActivelyReadingTaskGroup(
+ int taskGroupId,
+ ImmutableMap<Integer, Long> partitionOffsets,
+ Optional<DateTime> minMsgTime,
+ Optional<DateTime> maxMsgTime,
+ Set<String> tasks
+ )
+ {
+ TaskGroup group = new TaskGroup(
+ taskGroupId,
+ partitionOffsets,
+ minMsgTime,
+ maxMsgTime
+ );
+ group.tasks.putAll(tasks.stream().collect(Collectors.toMap(x -> x, x ->
new TaskData())));
+ if (taskGroups.putIfAbsent(taskGroupId, group) != null) {
+ throw new ISE(
+ "trying to add taskGroup with ID [%s] to actively reading task
groups, but group already exists.",
+ taskGroupId
+ );
+ }
+ }
+
+ @VisibleForTesting
+ void addTaskGroupToPendingCompletionTaskGroup(
+ int taskGroupId,
+ ImmutableMap<Integer, Long> partitionOffsets,
+ Optional<DateTime> minMsgTime,
+ Optional<DateTime> maxMsgTime,
+ Set<String> tasks
+ )
+ {
+ TaskGroup group = new TaskGroup(
+ taskGroupId,
+ partitionOffsets,
+ minMsgTime,
+ maxMsgTime
+ );
+ group.tasks.putAll(tasks.stream().collect(Collectors.toMap(x -> x, x ->
new TaskData())));
+ pendingCompletionTaskGroups.computeIfAbsent(taskGroupId, x -> new
CopyOnWriteArrayList<>())
+ .add(group);
+ }
+
+ @VisibleForTesting
@Nullable
TaskGroup removeTaskGroup(int taskGroupId)
{
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 084696d..2379123 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import kafka.admin.AdminUtils;
@@ -2546,6 +2547,48 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertEquals(Long.MAX_VALUE, (long)
taskConfig.getEndPartitions().getPartitionOffsetMap().get(2));
}
+ @Test
+ public void testGetCurrentTotalStats() throws Exception
+ {
+ supervisor = getSupervisor(1, 2, true, "PT1H", null, null, false);
+ supervisor.addTaskGroupToActivelyReadingTaskGroup(
+ supervisor.getTaskGroupIdForPartition(0),
+ ImmutableMap.of(0, 0L),
+ Optional.absent(),
+ Optional.absent(),
+ ImmutableSet.of("task1")
+ );
+
+ supervisor.addTaskGroupToPendingCompletionTaskGroup(
+ supervisor.getTaskGroupIdForPartition(1),
+ ImmutableMap.of(0, 0L),
+ Optional.absent(),
+ Optional.absent(),
+ ImmutableSet.of("task2")
+ );
+
+
expect(taskClient.getMovingAveragesAsync("task1")).andReturn(Futures.immediateFuture(ImmutableMap.of(
+ "prop1",
+ "val1"
+ ))).times(1);
+
+
expect(taskClient.getMovingAveragesAsync("task2")).andReturn(Futures.immediateFuture(ImmutableMap.of(
+ "prop2",
+ "val2"
+ ))).times(1);
+
+ replayAll();
+
+ Map<String, Map<String, Object>> stats = supervisor.getStats();
+
+ verifyAll();
+
+ Assert.assertEquals(2, stats.size());
+ Assert.assertEquals(ImmutableSet.of("0", "1"), stats.keySet());
+ Assert.assertEquals(ImmutableMap.of("task1", ImmutableMap.of("prop1",
"val1")), stats.get("0"));
+ Assert.assertEquals(ImmutableMap.of("task2", ImmutableMap.of("prop2",
"val2")), stats.get("1"));
+ }
+
private void addSomeEvents(int numEventsPerPartition) throws Exception
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]