jon-wei closed pull request #5996: Fix NPE while handling CheckpointNotice in 
KafkaSupervisor
URL: https://github.com/apache/incubator-druid/pull/5996
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/extensions-contrib/materialized-view-maintenance/src/main/java/io/druid/indexing/materializedview/MaterializedViewSupervisor.java
 
b/extensions-contrib/materialized-view-maintenance/src/main/java/io/druid/indexing/materializedview/MaterializedViewSupervisor.java
index eba35a4faff..fedda092c4d 100644
--- 
a/extensions-contrib/materialized-view-maintenance/src/main/java/io/druid/indexing/materializedview/MaterializedViewSupervisor.java
+++ 
b/extensions-contrib/materialized-view-maintenance/src/main/java/io/druid/indexing/materializedview/MaterializedViewSupervisor.java
@@ -53,7 +53,6 @@
 import org.joda.time.Duration;
 import org.joda.time.Interval;
 
-import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
@@ -240,11 +239,7 @@ public void reset(DataSourceMetadata dataSourceMetadata)
   }
 
   @Override
-  public void checkpoint(
-      @Nullable String sequenceName,
-      @Nullable DataSourceMetadata previousCheckPoint,
-      @Nullable DataSourceMetadata currentCheckPoint
-  )
+  public void checkpoint(int taskGroupId, DataSourceMetadata 
previousCheckPoint, DataSourceMetadata currentCheckPoint)
   {
     // do nothing
   }
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
 
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
index 3a440b14fcb..a93fde611c5 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
@@ -600,12 +600,13 @@ public void onFailure(Throwable t)
                 sequences
             );
             requestPause();
-            if (!toolbox.getTaskActionClient().submit(new 
CheckPointDataSourceMetadataAction(
+            final CheckPointDataSourceMetadataAction checkpointAction = new 
CheckPointDataSourceMetadataAction(
                 task.getDataSource(),
-                ioConfig.getBaseSequenceName(),
+                ioConfig.getTaskGroupId(),
                 new KafkaDataSourceMetadata(new KafkaPartitions(topic, 
sequenceToCheckpoint.getStartOffsets())),
                 new KafkaDataSourceMetadata(new KafkaPartitions(topic, 
nextOffsets))
-            ))) {
+            );
+            if (!toolbox.getTaskActionClient().submit(checkpointAction)) {
               throw new ISE("Checkpoint request with offsets [%s] failed, 
dying", nextOffsets);
             }
           }
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java
 
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java
index 4dd3aaf3885..b6c1d765c95 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java
@@ -26,6 +26,7 @@
 import io.druid.segment.indexing.IOConfig;
 import org.joda.time.DateTime;
 
+import javax.annotation.Nullable;
 import java.util.Map;
 
 public class KafkaIOConfig implements IOConfig
@@ -33,6 +34,8 @@
   private static final boolean DEFAULT_USE_TRANSACTION = true;
   private static final boolean DEFAULT_SKIP_OFFSET_GAPS = false;
 
+  @Nullable
+  private final Integer taskGroupId;
   private final String baseSequenceName;
   private final KafkaPartitions startPartitions;
   private final KafkaPartitions endPartitions;
@@ -44,6 +47,7 @@
 
   @JsonCreator
   public KafkaIOConfig(
+      @JsonProperty("taskGroupId") @Nullable Integer taskGroupId, // can be 
null for backward compabitility
       @JsonProperty("baseSequenceName") String baseSequenceName,
       @JsonProperty("startPartitions") KafkaPartitions startPartitions,
       @JsonProperty("endPartitions") KafkaPartitions endPartitions,
@@ -54,6 +58,7 @@ public KafkaIOConfig(
       @JsonProperty("skipOffsetGaps") Boolean skipOffsetGaps
   )
   {
+    this.taskGroupId = taskGroupId;
     this.baseSequenceName = Preconditions.checkNotNull(baseSequenceName, 
"baseSequenceName");
     this.startPartitions = Preconditions.checkNotNull(startPartitions, 
"startPartitions");
     this.endPartitions = Preconditions.checkNotNull(endPartitions, 
"endPartitions");
@@ -83,6 +88,13 @@ public KafkaIOConfig(
     }
   }
 
+  @Nullable
+  @JsonProperty
+  public Integer getTaskGroupId()
+  {
+    return taskGroupId;
+  }
+
   @JsonProperty
   public String getBaseSequenceName()
   {
@@ -135,7 +147,8 @@ public boolean isSkipOffsetGaps()
   public String toString()
   {
     return "KafkaIOConfig{" +
-           "baseSequenceName='" + baseSequenceName + '\'' +
+           "taskGroupId=" + taskGroupId +
+           ", baseSequenceName='" + baseSequenceName + '\'' +
            ", startPartitions=" + startPartitions +
            ", endPartitions=" + endPartitions +
            ", consumerProperties=" + consumerProperties +
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
 
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index 8ea13efad0a..ed287fa0591 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -143,8 +143,7 @@
    * time, there should only be up to a maximum of [taskCount] 
actively-reading task groups (tracked in the [taskGroups]
    * map) + zero or more pending-completion task groups (tracked in 
[pendingCompletionTaskGroups]).
    */
-  @VisibleForTesting
-  static class TaskGroup
+  private static class TaskGroup
   {
     // This specifies the partitions and starting offsets for this task group. 
It is set on group creation from the data
     // in [partitionGroups] and never changes during the lifetime of this task 
group, which will live until a task in
@@ -159,7 +158,7 @@
     DateTime completionTimeout; // is set after signalTasksToFinish(); if not 
done by timeout, take corrective action
     final TreeMap<Integer, Map<Integer, Long>> sequenceOffsets = new 
TreeMap<>();
 
-    public TaskGroup(
+    TaskGroup(
         ImmutableMap<Integer, Long> partitionOffsets,
         Optional<DateTime> minimumMessageTime,
         Optional<DateTime> maximumMessageTime
@@ -171,7 +170,7 @@ public TaskGroup(
       this.sequenceOffsets.put(0, partitionOffsets);
     }
 
-    public int addNewCheckpoint(Map<Integer, Long> checkpoint)
+    int addNewCheckpoint(Map<Integer, Long> checkpoint)
     {
       sequenceOffsets.put(sequenceOffsets.lastKey() + 1, checkpoint);
       return sequenceOffsets.lastKey();
@@ -212,9 +211,6 @@ public int addNewCheckpoint(Map<Integer, Long> checkpoint)
   private final ConcurrentHashMap<Integer, ConcurrentHashMap<Integer, Long>> 
partitionGroups = new ConcurrentHashMap<>();
   // --------------------------------------------------------
 
-  // BaseSequenceName -> TaskGroup
-  private final ConcurrentHashMap<String, TaskGroup> sequenceTaskGroup = new 
ConcurrentHashMap<>();
-
   private final TaskStorage taskStorage;
   private final TaskMaster taskMaster;
   private final IndexerMetadataStorageCoordinator 
indexerMetadataStorageCoordinator;
@@ -513,13 +509,9 @@ public void reset(DataSourceMetadata dataSourceMetadata)
   }
 
   @Override
-  public void checkpoint(
-      String sequenceName,
-      DataSourceMetadata previousCheckpoint,
-      DataSourceMetadata currentCheckpoint
-  )
+  public void checkpoint(int taskGroupId, DataSourceMetadata 
previousCheckpoint, DataSourceMetadata currentCheckpoint)
   {
-    Preconditions.checkNotNull(sequenceName, "Cannot checkpoint without a 
sequence name");
+    Preconditions.checkNotNull(previousCheckpoint, "previousCheckpoint");
     Preconditions.checkNotNull(currentCheckpoint, "current checkpoint cannot 
be null");
     Preconditions.checkArgument(
         ioConfig.getTopic()
@@ -530,12 +522,14 @@ public void checkpoint(
         ((KafkaDataSourceMetadata) 
currentCheckpoint).getKafkaPartitions().getTopic()
     );
 
-    log.info("Checkpointing [%s] for sequence [%s]", currentCheckpoint, 
sequenceName);
-    notices.add(new CheckpointNotice(
-        sequenceName,
-        (KafkaDataSourceMetadata) previousCheckpoint,
-        (KafkaDataSourceMetadata) currentCheckpoint
-    ));
+    log.info("Checkpointing [%s] for taskGroup [%s]", currentCheckpoint, 
taskGroupId);
+    notices.add(
+        new CheckpointNotice(
+            taskGroupId,
+            (KafkaDataSourceMetadata) previousCheckpoint,
+            (KafkaDataSourceMetadata) currentCheckpoint
+        )
+    );
   }
 
   public void possiblyRegisterListener()
@@ -637,17 +631,17 @@ public void handle()
 
   private class CheckpointNotice implements Notice
   {
-    final String sequenceName;
+    final int taskGroupId;
     final KafkaDataSourceMetadata previousCheckpoint;
     final KafkaDataSourceMetadata currentCheckpoint;
 
     CheckpointNotice(
-        String sequenceName,
+        int taskGroupId,
         KafkaDataSourceMetadata previousCheckpoint,
         KafkaDataSourceMetadata currentCheckpoint
     )
     {
-      this.sequenceName = sequenceName;
+      this.taskGroupId = taskGroupId;
       this.previousCheckpoint = previousCheckpoint;
       this.currentCheckpoint = currentCheckpoint;
     }
@@ -658,17 +652,12 @@ public void handle() throws ExecutionException, 
InterruptedException
       // check for consistency
       // if already received request for this sequenceName and 
dataSourceMetadata combination then return
 
-      Preconditions.checkNotNull(
-          sequenceTaskGroup.get(sequenceName),
-          "WTH?! cannot find task group for this sequence [%s], 
sequencesTaskGroup map [%s], taskGroups [%s]",
-          sequenceName,
-          sequenceTaskGroup,
-          taskGroups
-      );
-      final TreeMap<Integer, Map<Integer, Long>> checkpoints = 
sequenceTaskGroup.get(sequenceName).sequenceOffsets;
+      final TaskGroup taskGroup = taskGroups.get(taskGroupId);
+
+      if (isValidTaskGroup(taskGroup)) {
+        final TreeMap<Integer, Map<Integer, Long>> checkpoints = 
taskGroup.sequenceOffsets;
 
-      // check validity of previousCheckpoint if it is not null
-      if (previousCheckpoint != null) {
+        // check validity of previousCheckpoint
         int index = checkpoints.size();
         for (int sequenceId : checkpoints.descendingKeySet()) {
           Map<Integer, Long> checkpoint = checkpoints.get(sequenceId);
@@ -685,26 +674,39 @@ public void handle() throws ExecutionException, 
InterruptedException
           log.info("Already checkpointed with offsets [%s]", 
checkpoints.lastEntry().getValue());
           return;
         }
-      } else {
-        // There cannot be more than one checkpoint when previous checkpoint 
is null
-        // as when the task starts they are sent existing checkpoints
-        Preconditions.checkState(
-            checkpoints.size() <= 1,
-            "Got checkpoint request with null as previous check point, however 
found more than one checkpoints"
+        final int taskGroupId = getTaskGroupIdForPartition(
+            currentCheckpoint.getKafkaPartitions()
+                             .getPartitionOffsetMap()
+                             .keySet()
+                             .iterator()
+                             .next()
         );
-        if (checkpoints.size() == 1) {
-          log.info("Already checkpointed with dataSourceMetadata [%s]", 
checkpoints.get(0));
-          return;
+        final Map<Integer, Long> newCheckpoint = 
checkpointTaskGroup(taskGroupId, false).get();
+        taskGroups.get(taskGroupId).addNewCheckpoint(newCheckpoint);
+        log.info("Handled checkpoint notice, new checkpoint is [%s] for 
taskGroup [%s]", newCheckpoint, taskGroupId);
+      }
+    }
+
+    private boolean isValidTaskGroup(@Nullable TaskGroup taskGroup)
+    {
+      if (taskGroup == null) {
+        // taskGroup might be in pendingCompletionTaskGroups or partitionGroups
+        if (pendingCompletionTaskGroups.containsKey(taskGroupId)) {
+          log.warn(
+              "Ignoring checkpoint request because taskGroup[%d] has already 
stopped indexing and is waiting for "
+              + "publishing segments",
+              taskGroupId
+          );
+          return false;
+        } else if (partitionGroups.containsKey(taskGroupId)) {
+          log.warn("Ignoring checkpoint request because taskGroup[%d] is 
inactive", taskGroupId);
+          return false;
+        } else {
+          throw new ISE("WTH?! cannot find taskGroup [%s] among all taskGroups 
[%s]", taskGroupId, taskGroups);
         }
       }
-      final int taskGroupId = 
getTaskGroupIdForPartition(currentCheckpoint.getKafkaPartitions()
-                                                                          
.getPartitionOffsetMap()
-                                                                          
.keySet()
-                                                                          
.iterator()
-                                                                          
.next());
-      final Map<Integer, Long> newCheckpoint = 
checkpointTaskGroup(taskGroupId, false).get();
-      sequenceTaskGroup.get(sequenceName).addNewCheckpoint(newCheckpoint);
-      log.info("Handled checkpoint notice, new checkpoint is [%s] for sequence 
[%s]", newCheckpoint, sequenceName);
+
+      return true;
     }
   }
 
@@ -718,7 +720,6 @@ void resetInternal(DataSourceMetadata dataSourceMetadata)
       taskGroups.values().forEach(this::killTasksInGroup);
       taskGroups.clear();
       partitionGroups.clear();
-      sequenceTaskGroup.clear();
     } else if (!(dataSourceMetadata instanceof KafkaDataSourceMetadata)) {
       throw new IAE("Expected KafkaDataSourceMetadata but found instance of 
[%s]", dataSourceMetadata.getClass());
     } else {
@@ -778,8 +779,7 @@ void resetInternal(DataSourceMetadata dataSourceMetadata)
           
resetKafkaMetadata.getKafkaPartitions().getPartitionOffsetMap().keySet().forEach(partition
 -> {
             final int groupId = getTaskGroupIdForPartition(partition);
             killTaskGroupForPartitions(ImmutableSet.of(partition));
-            final TaskGroup removedGroup = taskGroups.remove(groupId);
-            sequenceTaskGroup.remove(generateSequenceName(removedGroup));
+            taskGroups.remove(groupId);
             partitionGroups.get(groupId).replaceAll((partitionId, offset) -> 
NOT_SET);
           });
         } else {
@@ -955,9 +955,10 @@ private void updatePartitionDataFromKafka()
     for (int partition = 0; partition < numPartitions; partition++) {
       int taskGroupId = getTaskGroupIdForPartition(partition);
 
-      partitionGroups.putIfAbsent(taskGroupId, new ConcurrentHashMap<Integer, 
Long>());
-
-      ConcurrentHashMap<Integer, Long> partitionMap = 
partitionGroups.get(taskGroupId);
+      ConcurrentHashMap<Integer, Long> partitionMap = 
partitionGroups.computeIfAbsent(
+          taskGroupId,
+          k -> new ConcurrentHashMap<>()
+      );
 
       // The starting offset for a new partition in [partitionGroups] is 
initially set to NOT_SET; when a new task group
       // is created and is assigned partitions, if the offset in 
[partitionGroups] is NOT_SET it will take the starting
@@ -1087,23 +1088,21 @@ public Boolean apply(KafkaIndexTask.Status status)
                             }
                             return false;
                           } else {
-                            final TaskGroup taskGroup = new TaskGroup(
-                                ImmutableMap.copyOf(
-                                    kafkaTask.getIOConfig()
-                                             .getStartPartitions()
-                                             .getPartitionOffsetMap()
-                                ), 
kafkaTask.getIOConfig().getMinimumMessageTime(),
-                                kafkaTask.getIOConfig().getMaximumMessageTime()
-                            );
-                            if (taskGroups.putIfAbsent(
+                            final TaskGroup taskGroup = 
taskGroups.computeIfAbsent(
                                 taskGroupId,
-                                taskGroup
-                            ) == null) {
-                              
sequenceTaskGroup.put(generateSequenceName(taskGroup), 
taskGroups.get(taskGroupId));
-                              log.info("Created new task group [%d]", 
taskGroupId);
-                            }
+                                k -> {
+                                  log.info("Creating a new task group for 
taskGroupId[%d]", taskGroupId);
+                                  return new TaskGroup(
+                                      ImmutableMap.copyOf(
+                                          
kafkaTask.getIOConfig().getStartPartitions().getPartitionOffsetMap()
+                                      ),
+                                      
kafkaTask.getIOConfig().getMinimumMessageTime(),
+                                      
kafkaTask.getIOConfig().getMaximumMessageTime()
+                                  );
+                                }
+                            );
                             taskGroupsToVerify.add(taskGroupId);
-                            
taskGroups.get(taskGroupId).tasks.putIfAbsent(taskId, new TaskData());
+                            taskGroup.tasks.putIfAbsent(taskId, new 
TaskData());
                           }
                         }
                         return true;
@@ -1256,7 +1255,6 @@ public void onFailure(Throwable t)
       // killing all tasks or no task left in the group ?
       // clear state about the taskgroup so that get latest offset information 
is fetched from metadata store
       log.warn("Clearing task group [%d] information as no valid tasks left 
the group", groupId);
-      sequenceTaskGroup.remove(generateSequenceName(taskGroup));
       taskGroups.remove(groupId);
       partitionGroups.get(groupId).replaceAll((partition, offset) -> NOT_SET);
     }
@@ -1281,9 +1279,10 @@ private void 
addDiscoveredTaskToPendingCompletionTaskGroups(
       Map<Integer, Long> startingPartitions
   )
   {
-    pendingCompletionTaskGroups.putIfAbsent(groupId, 
Lists.<TaskGroup>newCopyOnWriteArrayList());
-
-    CopyOnWriteArrayList<TaskGroup> taskGroupList = 
pendingCompletionTaskGroups.get(groupId);
+    final CopyOnWriteArrayList<TaskGroup> taskGroupList = 
pendingCompletionTaskGroups.computeIfAbsent(
+        groupId,
+        k -> new CopyOnWriteArrayList<>()
+    );
     for (TaskGroup taskGroup : taskGroupList) {
       if (taskGroup.partitionOffsets.equals(startingPartitions)) {
         if (taskGroup.tasks.putIfAbsent(taskId, new TaskData()) == null) {
@@ -1411,8 +1410,7 @@ private void checkTaskDuration() throws 
InterruptedException, ExecutionException
       if (endOffsets != null) {
         // set a timeout and put this group in pendingCompletionTaskGroups so 
that it can be monitored for completion
         group.completionTimeout = 
DateTimes.nowUtc().plus(ioConfig.getCompletionTimeout());
-        pendingCompletionTaskGroups.putIfAbsent(groupId, 
Lists.<TaskGroup>newCopyOnWriteArrayList());
-        pendingCompletionTaskGroups.get(groupId).add(group);
+        pendingCompletionTaskGroups.computeIfAbsent(groupId, k -> new 
CopyOnWriteArrayList<>()).add(group);
 
         // set endOffsets as the next startOffsets
         for (Map.Entry<Integer, Long> entry : endOffsets.entrySet()) {
@@ -1432,7 +1430,6 @@ private void checkTaskDuration() throws 
InterruptedException, ExecutionException
         partitionGroups.get(groupId).replaceAll((partition, offset) -> 
NOT_SET);
       }
 
-      sequenceTaskGroup.remove(generateSequenceName(group));
       // remove this task group from the list of current task groups now that 
it has been handled
       taskGroups.remove(groupId);
     }
@@ -1456,7 +1453,8 @@ private void checkTaskDuration() throws 
InterruptedException, ExecutionException
           // metadata store (which will have advanced if we succeeded in 
publishing and will remain the same if publishing
           // failed and we need to re-ingest)
           return Futures.transform(
-              stopTasksInGroup(taskGroup), new Function<Object, Map<Integer, 
Long>>()
+              stopTasksInGroup(taskGroup),
+              new Function<Object, Map<Integer, Long>>()
               {
                 @Nullable
                 @Override
@@ -1625,15 +1623,15 @@ private void checkPendingCompletionTasks() throws 
ExecutionException, Interrupte
             log.warn("All tasks in group [%d] failed to publish, killing all 
tasks for these partitions", groupId);
           } else {
             log.makeAlert(
-                "No task in [%s] succeeded before the completion timeout 
elapsed [%s]!",
+                "No task in [%s] for taskGroup [%d] succeeded before the 
completion timeout elapsed [%s]!",
                 group.taskIds(),
+                groupId,
                 ioConfig.getCompletionTimeout()
             ).emit();
           }
 
           // reset partitions offsets for this task group so that they will be 
re-read from metadata storage
           partitionGroups.get(groupId).replaceAll((partition, offset) -> 
NOT_SET);
-          sequenceTaskGroup.remove(generateSequenceName(group));
           // kill all the tasks in this pending completion group
           killTasksInGroup(group);
           // set a flag so the other pending completion groups for this set of 
partitions will also stop
@@ -1693,7 +1691,6 @@ private void checkCurrentTaskState() throws 
ExecutionException, InterruptedExcep
         // be recreated with the next set of offsets
         if (taskData.status.isSuccess()) {
           futures.add(stopTasksInGroup(taskGroup));
-          sequenceTaskGroup.remove(generateSequenceName(taskGroup));
           iTaskGroups.remove();
           break;
         }
@@ -1735,7 +1732,6 @@ void createNewTasks() throws JsonProcessingException
             groupId,
             taskGroup
         );
-        sequenceTaskGroup.put(generateSequenceName(taskGroup), 
taskGroups.get(groupId));
       }
     }
 
@@ -1778,6 +1774,7 @@ private void createKafkaTasksForGroup(int groupId, int 
replicas) throws JsonProc
     DateTime maximumMessageTime = 
taskGroups.get(groupId).maximumMessageTime.orNull();
 
     KafkaIOConfig kafkaIOConfig = new KafkaIOConfig(
+        groupId,
         sequenceName,
         new KafkaPartitions(ioConfig.getTopic(), startPartitions),
         new KafkaPartitions(ioConfig.getTopic(), endPartitions),
@@ -1944,7 +1941,7 @@ private boolean isTaskCurrent(int taskGroupId, String 
taskId)
     }
   }
 
-  private ListenableFuture<?> stopTasksInGroup(TaskGroup taskGroup)
+  private ListenableFuture<?> stopTasksInGroup(@Nullable TaskGroup taskGroup)
   {
     if (taskGroup == null) {
       return Futures.immediateFuture(null);
@@ -2289,6 +2286,28 @@ Runnable updateCurrentAndLatestOffsets()
     return allStats;
   }
 
+  @VisibleForTesting
+  @Nullable
+  TaskGroup removeTaskGroup(int taskGroupId)
+  {
+    return taskGroups.remove(taskGroupId);
+  }
+
+  @VisibleForTesting
+  void moveTaskGroupToPendingCompletion(int taskGroupId)
+  {
+    final TaskGroup taskGroup = taskGroups.remove(taskGroupId);
+    if (taskGroup != null) {
+      pendingCompletionTaskGroups.computeIfAbsent(taskGroupId, k -> new 
CopyOnWriteArrayList<>()).add(taskGroup);
+    }
+  }
+
+  @VisibleForTesting
+  int getNoticesQueueSize()
+  {
+    return notices.size();
+  }
+
   private static class StatsFromTaskResult
   {
     private final String groupId;
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java
index 3bc55e277cb..050dba753b8 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java
@@ -50,6 +50,7 @@ public void testSerdeWithDefaults() throws Exception
   {
     String jsonStr = "{\n"
                      + "  \"type\": \"kafka\",\n"
+                     + "  \"taskGroupId\": 0,\n"
                      + "  \"baseSequenceName\": \"my-sequence-name\",\n"
                      + "  \"startPartitions\": {\"topic\":\"mytopic\", 
\"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
                      + "  \"endPartitions\": {\"topic\":\"mytopic\", 
\"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
@@ -82,6 +83,7 @@ public void testSerdeWithNonDefaults() throws Exception
   {
     String jsonStr = "{\n"
                      + "  \"type\": \"kafka\",\n"
+                     + "  \"taskGroupId\": 0,\n"
                      + "  \"baseSequenceName\": \"my-sequence-name\",\n"
                      + "  \"startPartitions\": {\"topic\":\"mytopic\", 
\"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
                      + "  \"endPartitions\": {\"topic\":\"mytopic\", 
\"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
@@ -118,6 +120,7 @@ public void testBaseSequenceNameRequired() throws Exception
   {
     String jsonStr = "{\n"
                      + "  \"type\": \"kafka\",\n"
+                     + "  \"taskGroupId\": 0,\n"
                      + "  \"startPartitions\": {\"topic\":\"mytopic\", 
\"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
                      + "  \"endPartitions\": {\"topic\":\"mytopic\", 
\"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
                      + "  \"consumerProperties\": 
{\"bootstrap.servers\":\"localhost:9092\"},\n"
@@ -137,6 +140,7 @@ public void testStartPartitionsRequired() throws Exception
   {
     String jsonStr = "{\n"
                      + "  \"type\": \"kafka\",\n"
+                     + "  \"taskGroupId\": 0,\n"
                      + "  \"baseSequenceName\": \"my-sequence-name\",\n"
                      + "  \"endPartitions\": {\"topic\":\"mytopic\", 
\"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
                      + "  \"consumerProperties\": 
{\"bootstrap.servers\":\"localhost:9092\"},\n"
@@ -156,6 +160,7 @@ public void testEndPartitionsRequired() throws Exception
   {
     String jsonStr = "{\n"
                      + "  \"type\": \"kafka\",\n"
+                     + "  \"taskGroupId\": 0,\n"
                      + "  \"baseSequenceName\": \"my-sequence-name\",\n"
                      + "  \"startPartitions\": {\"topic\":\"mytopic\", 
\"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
                      + "  \"consumerProperties\": 
{\"bootstrap.servers\":\"localhost:9092\"},\n"
@@ -175,6 +180,7 @@ public void testConsumerPropertiesRequired() throws 
Exception
   {
     String jsonStr = "{\n"
                      + "  \"type\": \"kafka\",\n"
+                     + "  \"taskGroupId\": 0,\n"
                      + "  \"baseSequenceName\": \"my-sequence-name\",\n"
                      + "  \"startPartitions\": {\"topic\":\"mytopic\", 
\"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
                      + "  \"endPartitions\": {\"topic\":\"mytopic\", 
\"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
@@ -194,6 +200,7 @@ public void testStartAndEndTopicMatch() throws Exception
   {
     String jsonStr = "{\n"
                      + "  \"type\": \"kafka\",\n"
+                     + "  \"taskGroupId\": 0,\n"
                      + "  \"baseSequenceName\": \"my-sequence-name\",\n"
                      + "  \"startPartitions\": {\"topic\":\"mytopic\", 
\"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
                      + "  \"endPartitions\": {\"topic\":\"other\", 
\"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
@@ -214,6 +221,7 @@ public void testStartAndEndPartitionSetMatch() throws 
Exception
   {
     String jsonStr = "{\n"
                      + "  \"type\": \"kafka\",\n"
+                     + "  \"taskGroupId\": 0,\n"
                      + "  \"baseSequenceName\": \"my-sequence-name\",\n"
                      + "  \"startPartitions\": {\"topic\":\"mytopic\", 
\"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
                      + "  \"endPartitions\": {\"topic\":\"mytopic\", 
\"partitionOffsetMap\" : {\"0\":15}},\n"
@@ -234,6 +242,7 @@ public void testEndOffsetGreaterThanStart() throws Exception
   {
     String jsonStr = "{\n"
                      + "  \"type\": \"kafka\",\n"
+                     + "  \"taskGroupId\": 0,\n"
                      + "  \"baseSequenceName\": \"my-sequence-name\",\n"
                      + "  \"startPartitions\": {\"topic\":\"mytopic\", 
\"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
                      + "  \"endPartitions\": {\"topic\":\"mytopic\", 
\"partitionOffsetMap\" : {\"0\":15, \"1\":2}},\n"
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
index 15f4bb07775..411fff9168e 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -262,21 +262,21 @@ public KafkaIndexTaskTest(boolean 
isIncrementalHandoffSupported)
   private static List<ProducerRecord<byte[], byte[]>> generateRecords(String 
topic)
   {
     return ImmutableList.of(
-        new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2008", "a", 
"y", "10", "20.0", "1.0")),
-        new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2009", "b", 
"y", "10", "20.0", "1.0")),
-        new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2010", "c", 
"y", "10", "20.0", "1.0")),
-        new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2011", "d", 
"y", "10", "20.0", "1.0")),
-        new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2011", "e", 
"y", "10", "20.0", "1.0")),
-        new ProducerRecord<byte[], byte[]>(topic, 0, null, 
JB("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")),
-        new ProducerRecord<byte[], byte[]>(topic, 0, null, 
StringUtils.toUtf8("unparseable")),
-        new ProducerRecord<byte[], byte[]>(topic, 0, null, 
StringUtils.toUtf8("unparseable2")),
-        new ProducerRecord<byte[], byte[]>(topic, 0, null, null),
-        new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2013", "f", 
"y", "10", "20.0", "1.0")),
-        new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2049", "f", 
"y", "notanumber", "20.0", "1.0")),
-        new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2049", "f", 
"y", "10", "notanumber", "1.0")),
-        new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2049", "f", 
"y", "10", "20.0", "notanumber")),
-        new ProducerRecord<byte[], byte[]>(topic, 1, null, JB("2012", "g", 
"y", "10", "20.0", "1.0")),
-        new ProducerRecord<byte[], byte[]>(topic, 1, null, JB("2011", "h", 
"y", "10", "20.0", "1.0"))
+        new ProducerRecord<>(topic, 0, null, JB("2008", "a", "y", "10", 
"20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, JB("2009", "b", "y", "10", 
"20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, JB("2010", "c", "y", "10", 
"20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, JB("2011", "d", "y", "10", 
"20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, JB("2011", "e", "y", "10", 
"20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, 
JB("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, 
StringUtils.toUtf8("unparseable")),
+        new ProducerRecord<>(topic, 0, null, 
StringUtils.toUtf8("unparseable2")),
+        new ProducerRecord<>(topic, 0, null, null),
+        new ProducerRecord<>(topic, 0, null, JB("2013", "f", "y", "10", 
"20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, JB("2049", "f", "y", 
"notanumber", "20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, JB("2049", "f", "y", "10", 
"notanumber", "1.0")),
+        new ProducerRecord<>(topic, 0, null, JB("2049", "f", "y", "10", 
"20.0", "notanumber")),
+        new ProducerRecord<>(topic, 1, null, JB("2012", "g", "y", "10", 
"20.0", "1.0")),
+        new ProducerRecord<>(topic, 1, null, JB("2011", "h", "y", "10", 
"20.0", "1.0"))
     );
   }
 
@@ -377,6 +377,7 @@ public void testRunAfterDataInserted() throws Exception
     final KafkaIndexTask task = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -418,6 +419,7 @@ public void testRunBeforeDataInserted() throws Exception
     final KafkaIndexTask task = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -493,6 +495,7 @@ public void testIncrementalHandOff() throws Exception
     final KafkaIndexTask task = createTask(
         null,
         new KafkaIOConfig(
+            0,
             baseSequenceName,
             startPartitions,
             endPartitions,
@@ -514,14 +517,16 @@ public void testIncrementalHandOff() throws Exception
     Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
 
     Assert.assertEquals(1, checkpointRequestsHash.size());
-    Assert.assertTrue(checkpointRequestsHash.contains(
-        Objects.hash(
-            DATA_SCHEMA.getDataSource(),
-            baseSequenceName,
-            new KafkaDataSourceMetadata(startPartitions),
-            new KafkaDataSourceMetadata(new KafkaPartitions(topic, 
currentOffsets))
+    Assert.assertTrue(
+        checkpointRequestsHash.contains(
+            Objects.hash(
+                DATA_SCHEMA.getDataSource(),
+                0,
+                new KafkaDataSourceMetadata(startPartitions),
+                new KafkaDataSourceMetadata(new KafkaPartitions(topic, 
currentOffsets))
+            )
         )
-    ));
+    );
 
     // Check metrics
     Assert.assertEquals(8, 
task.getRunner().getRowIngestionMeters().getProcessed());
@@ -581,6 +586,7 @@ public void testTimeBasedIncrementalHandOff() throws 
Exception
     final KafkaIndexTask task = createTask(
         null,
         new KafkaIOConfig(
+            0,
             baseSequenceName,
             startPartitions,
             endPartitions,
@@ -603,14 +609,16 @@ public void testTimeBasedIncrementalHandOff() throws 
Exception
     Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
 
     Assert.assertEquals(1, checkpointRequestsHash.size());
-    Assert.assertTrue(checkpointRequestsHash.contains(
-        Objects.hash(
-            DATA_SCHEMA.getDataSource(),
-            baseSequenceName,
-            new KafkaDataSourceMetadata(startPartitions),
-            new KafkaDataSourceMetadata(new KafkaPartitions(topic, 
checkpoint.getPartitionOffsetMap()))
+    Assert.assertTrue(
+        checkpointRequestsHash.contains(
+            Objects.hash(
+                DATA_SCHEMA.getDataSource(),
+                0,
+                new KafkaDataSourceMetadata(startPartitions),
+                new KafkaDataSourceMetadata(new KafkaPartitions(topic, 
checkpoint.getPartitionOffsetMap()))
+            )
         )
-    ));
+    );
 
     // Check metrics
     Assert.assertEquals(2, 
task.getRunner().getRowIngestionMeters().getProcessed());
@@ -637,6 +645,7 @@ public void testRunWithMinimumMessageTime() throws Exception
     final KafkaIndexTask task = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 0L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -690,6 +699,7 @@ public void testRunWithMaximumMessageTime() throws Exception
     final KafkaIndexTask task = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 0L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -753,6 +763,7 @@ public void testRunWithTransformSpec() throws Exception
             )
         ),
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 0L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -812,6 +823,7 @@ public void testRunOnNothing() throws Exception
     final KafkaIndexTask task = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
@@ -852,6 +864,7 @@ public void testHandoffConditionTimeoutWhenHandoffOccurs() 
throws Exception
     final KafkaIndexTask task = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -903,6 +916,7 @@ public void 
testHandoffConditionTimeoutWhenHandoffDoesNotOccur() throws Exceptio
     final KafkaIndexTask task = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -957,6 +971,7 @@ public void testReportParseExceptions() throws Exception
     final KafkaIndexTask task = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 7L)),
@@ -1000,6 +1015,7 @@ public void testMultipleParseExceptionsSuccess() throws 
Exception
     final KafkaIndexTask task = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 13L)),
@@ -1081,6 +1097,7 @@ public void testMultipleParseExceptionsFailure() throws 
Exception
     final KafkaIndexTask task = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 10L)),
@@ -1140,6 +1157,7 @@ public void testRunReplicas() throws Exception
     final KafkaIndexTask task1 = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -1153,6 +1171,7 @@ public void testRunReplicas() throws Exception
     final KafkaIndexTask task2 = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -1206,6 +1225,7 @@ public void testRunConflicting() throws Exception
     final KafkaIndexTask task1 = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -1219,6 +1239,7 @@ public void testRunConflicting() throws Exception
     final KafkaIndexTask task2 = createTask(
         null,
         new KafkaIOConfig(
+            1,
             "sequence1",
             new KafkaPartitions(topic, ImmutableMap.of(0, 3L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 10L)),
@@ -1273,6 +1294,7 @@ public void testRunConflictingWithoutTransactions() 
throws Exception
     final KafkaIndexTask task1 = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -1286,6 +1308,7 @@ public void testRunConflictingWithoutTransactions() 
throws Exception
     final KafkaIndexTask task2 = createTask(
         null,
         new KafkaIOConfig(
+            1,
             "sequence1",
             new KafkaPartitions(topic, ImmutableMap.of(0, 3L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 10L)),
@@ -1345,6 +1368,7 @@ public void testRunOneTaskTwoPartitions() throws Exception
     final KafkaIndexTask task = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 2L, 1, 0L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L, 1, 2L)),
@@ -1409,6 +1433,7 @@ public void testRunTwoTasksTwoPartitions() throws 
Exception
     final KafkaIndexTask task1 = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -1422,6 +1447,7 @@ public void testRunTwoTasksTwoPartitions() throws 
Exception
     final KafkaIndexTask task2 = createTask(
         null,
         new KafkaIOConfig(
+            1,
             "sequence1",
             new KafkaPartitions(topic, ImmutableMap.of(1, 0L)),
             new KafkaPartitions(topic, ImmutableMap.of(1, 1L)),
@@ -1477,6 +1503,7 @@ public void testRestore() throws Exception
     final KafkaIndexTask task1 = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -1513,6 +1540,7 @@ public void testRestore() throws Exception
     final KafkaIndexTask task2 = createTask(
         task1.getId(),
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -1564,6 +1592,7 @@ public void testRunWithPauseAndResume() throws Exception
     final KafkaIndexTask task = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -1647,6 +1676,7 @@ public void 
testRunWithOffsetOutOfRangeExceptionAndPause() throws Exception
     final KafkaIndexTask task = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -1685,6 +1715,7 @@ public void 
testRunWithOffsetOutOfRangeExceptionAndNextOffsetGreaterThanLeastAva
     final KafkaIndexTask task = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 200L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 500L)),
@@ -1737,6 +1768,7 @@ public void 
testRunContextSequenceAheadOfStartingOffsets() throws Exception
     final KafkaIndexTask task = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             // task should ignore these and use sequence info sent in the 
context
             new KafkaPartitions(topic, ImmutableMap.of(0, 0L)),
@@ -2026,18 +2058,20 @@ private void makeToolboxFactory() throws IOException
           @Override
           public boolean checkPointDataSourceMetadata(
               String supervisorId,
-              @Nullable String sequenceName,
+              int taskGroupId,
               @Nullable DataSourceMetadata previousDataSourceMetadata,
               @Nullable DataSourceMetadata currentDataSourceMetadata
           )
           {
             log.info("Adding checkpoint hash to the set");
-            checkpointRequestsHash.add(Objects.hash(
-                supervisorId,
-                sequenceName,
-                previousDataSourceMetadata,
-                currentDataSourceMetadata
-            ));
+            checkpointRequestsHash.add(
+                Objects.hash(
+                    supervisorId,
+                    taskGroupId,
+                    previousDataSourceMetadata,
+                    currentDataSourceMetadata
+                )
+            );
             return true;
           }
         }
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 86648fef282..f0f6033eea3 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -19,6 +19,7 @@
 
 package io.druid.indexing.kafka.supervisor;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
@@ -61,6 +62,7 @@
 import io.druid.java.util.common.granularity.Granularities;
 import io.druid.java.util.common.parsers.JSONPathFieldSpec;
 import io.druid.java.util.common.parsers.JSONPathSpec;
+import io.druid.java.util.emitter.EmittingLogger;
 import io.druid.query.aggregation.AggregatorFactory;
 import io.druid.query.aggregation.CountAggregatorFactory;
 import io.druid.segment.TestHelper;
@@ -70,6 +72,7 @@
 import io.druid.segment.realtime.FireDepartment;
 import io.druid.server.metrics.DruidMonitorSchedulerConfig;
 import io.druid.server.metrics.NoopServiceEmitter;
+import io.druid.server.metrics.ExceptionCapturingServiceEmitter;
 import org.apache.curator.test.TestingCluster;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -99,7 +102,9 @@
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeoutException;
 
 import static org.easymock.EasyMock.anyBoolean;
 import static org.easymock.EasyMock.anyObject;
@@ -141,6 +146,7 @@
   private TaskQueue taskQueue;
   private String topic;
   private RowIngestionMetersFactory rowIngestionMetersFactory;
+  private ExceptionCapturingServiceEmitter serviceEmitter;
 
   private static String getTopic()
   {
@@ -213,6 +219,8 @@ public void setupTest()
 
     topic = getTopic();
     rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory();
+    serviceEmitter = new ExceptionCapturingServiceEmitter();
+    EmittingLogger.registerEmitter(serviceEmitter);
   }
 
   @After
@@ -553,7 +561,7 @@ public void testKillIncompatibleTasks() throws Exception
     Task id1 = createKafkaIndexTask(
         "id1",
         DATASOURCE,
-        "index_kafka_testDS__some_other_sequenceName",
+        1,
         new KafkaPartitions("topic", ImmutableMap.of(0, 0L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, 10L)),
         null,
@@ -564,7 +572,7 @@ public void testKillIncompatibleTasks() throws Exception
     Task id2 = createKafkaIndexTask(
         "id2",
         DATASOURCE,
-        "sequenceName-0",
+        0,
         new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, 333L, 1, 333L, 2, 
333L)),
         null,
@@ -575,7 +583,7 @@ public void testKillIncompatibleTasks() throws Exception
     Task id3 = createKafkaIndexTask(
         "id3",
         DATASOURCE,
-        "index_kafka_testDS__some_other_sequenceName",
+        1,
         new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 1L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, 333L, 1, 333L, 2, 
330L)),
         null,
@@ -586,7 +594,7 @@ public void testKillIncompatibleTasks() throws Exception
     Task id4 = createKafkaIndexTask(
         "id4",
         "other-datasource",
-        "index_kafka_testDS_d927edff33c4b3f",
+        2,
         new KafkaPartitions("topic", ImmutableMap.of(0, 0L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, 10L)),
         null,
@@ -634,7 +642,9 @@ public void testKillIncompatibleTasks() throws Exception
 
     TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
     checkpoints.put(0, ImmutableMap.of(0, 0L, 1, 0L, 2, 0L));
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), 
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(2);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints))
+        .times(2);
 
     replayAll();
 
@@ -652,7 +662,7 @@ public void testKillBadPartitionAssignment() throws 
Exception
     Task id1 = createKafkaIndexTask(
         "id1",
         DATASOURCE,
-        "sequenceName-0",
+        0,
         new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 2, 0L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 2, 
Long.MAX_VALUE)),
         null,
@@ -661,7 +671,7 @@ public void testKillBadPartitionAssignment() throws 
Exception
     Task id2 = createKafkaIndexTask(
         "id2",
         DATASOURCE,
-        "sequenceName-1",
+        1,
         new KafkaPartitions("topic", ImmutableMap.of(1, 0L)),
         new KafkaPartitions("topic", ImmutableMap.of(1, Long.MAX_VALUE)),
         null,
@@ -670,7 +680,7 @@ public void testKillBadPartitionAssignment() throws 
Exception
     Task id3 = createKafkaIndexTask(
         "id3",
         DATASOURCE,
-        "sequenceName-0",
+        0,
         new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, 
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
         null,
@@ -679,7 +689,7 @@ public void testKillBadPartitionAssignment() throws 
Exception
     Task id4 = createKafkaIndexTask(
         "id4",
         DATASOURCE,
-        "sequenceName-0",
+        0,
         new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, 
Long.MAX_VALUE)),
         null,
@@ -688,7 +698,7 @@ public void testKillBadPartitionAssignment() throws 
Exception
     Task id5 = createKafkaIndexTask(
         "id5",
         DATASOURCE,
-        "sequenceName-0",
+        0,
         new KafkaPartitions("topic", ImmutableMap.of(1, 0L, 2, 0L)),
         new KafkaPartitions("topic", ImmutableMap.of(1, Long.MAX_VALUE, 2, 
Long.MAX_VALUE)),
         null,
@@ -727,8 +737,12 @@ public void testKillBadPartitionAssignment() throws 
Exception
     checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L));
     TreeMap<Integer, Map<Integer, Long>> checkpoints2 = new TreeMap<>();
     checkpoints2.put(0, ImmutableMap.of(1, 0L));
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), 
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1)).times(1);
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), 
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2)).times(1);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints1))
+        .times(1);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints2))
+        .times(1);
 
     taskRunner.registerListener(anyObject(TaskRunnerListener.class), 
anyObject(Executor.class));
     taskQueue.shutdown("id4");
@@ -765,10 +779,12 @@ public void testRequeueTaskWhenFailed() throws Exception
     checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L));
     TreeMap<Integer, Map<Integer, Long>> checkpoints2 = new TreeMap<>();
     checkpoints2.put(0, ImmutableMap.of(1, 0L));
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), 
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1))
-                                                                               
         .anyTimes();
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), 
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2))
-                                                                               
         .anyTimes();
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints1))
+        .anyTimes();
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints2))
+        .anyTimes();
 
     taskRunner.registerListener(anyObject(TaskRunnerListener.class), 
anyObject(Executor.class));
     replayAll();
@@ -830,7 +846,7 @@ public void testRequeueAdoptedTaskWhenFailed() throws 
Exception
     Task id1 = createKafkaIndexTask(
         "id1",
         DATASOURCE,
-        "sequenceName-0",
+        0,
         new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 2, 0L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 2, 
Long.MAX_VALUE)),
         now,
@@ -857,7 +873,9 @@ public void testRequeueAdoptedTaskWhenFailed() throws 
Exception
 
     TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
     checkpoints.put(0, ImmutableMap.of(0, 0L, 2, 0L));
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), 
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(2);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints))
+        .times(2);
 
     taskRunner.registerListener(anyObject(TaskRunnerListener.class), 
anyObject(Executor.class));
     replayAll();
@@ -878,9 +896,12 @@ public void testRequeueAdoptedTaskWhenFailed() throws 
Exception
     reset(taskClient);
 
     // for the newly created replica task
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), 
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints))
-                                                                               
         .times(2);
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), 
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints))
+        .times(2);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints))
+        .times(1);
 
     
expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(captured.getValue())).anyTimes();
     
expect(taskStorage.getStatus(iHaveFailed.getId())).andReturn(Optional.of(TaskStatus.failure(iHaveFailed.getId())));
@@ -953,10 +974,12 @@ public void testQueueNextTasksOnSuccess() throws Exception
     TreeMap<Integer, Map<Integer, Long>> checkpoints2 = new TreeMap<>();
     checkpoints2.put(0, ImmutableMap.of(1, 0L));
     // there would be 4 tasks, 2 for each task group
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), 
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1))
-                                                                               
         .times(2);
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), 
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2))
-                                                                               
         .times(2);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints1))
+        .times(2);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints2))
+        .times(2);
 
     expect(taskStorage.getActiveTasks()).andReturn(tasks).anyTimes();
     for (Task task : tasks) {
@@ -1063,10 +1086,12 @@ public void testBeginPublishAndQueueNextTasks() throws 
Exception
     checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L));
     TreeMap<Integer, Map<Integer, Long>> checkpoints2 = new TreeMap<>();
     checkpoints2.put(0, ImmutableMap.of(1, 0L));
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), 
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1))
-                                                                               
         .times(2);
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), 
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2))
-                                                                               
         .times(2);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints1))
+        .times(2);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints2))
+        .times(2);
 
     replay(taskStorage, taskRunner, taskClient, taskQueue);
 
@@ -1100,7 +1125,7 @@ public void testDiscoverExistingPublishingTask() throws 
Exception
     Task task = createKafkaIndexTask(
         "id1",
         DATASOURCE,
-        "sequenceName-0",
+        0,
         new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, 
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
         null,
@@ -1192,7 +1217,7 @@ public void 
testDiscoverExistingPublishingTaskWithDifferentPartitionAllocation()
     Task task = createKafkaIndexTask(
         "id1",
         DATASOURCE,
-        "sequenceName-0",
+        0,
         new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 2, 0L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 2, 
Long.MAX_VALUE)),
         null,
@@ -1282,7 +1307,7 @@ public void 
testDiscoverExistingPublishingAndReadingTask() throws Exception
     Task id1 = createKafkaIndexTask(
         "id1",
         DATASOURCE,
-        "sequenceName-0",
+        0,
         new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, 
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
         null,
@@ -1292,7 +1317,7 @@ public void 
testDiscoverExistingPublishingAndReadingTask() throws Exception
     Task id2 = createKafkaIndexTask(
         "id2",
         DATASOURCE,
-        "sequenceName-0",
+        0,
         new KafkaPartitions("topic", ImmutableMap.of(0, 1L, 1, 2L, 2, 3L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, 
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
         null,
@@ -1330,7 +1355,9 @@ public void 
testDiscoverExistingPublishingAndReadingTask() throws Exception
     // since id1 is publishing, so getCheckpoints wouldn't be called for it
     TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
     checkpoints.put(0, ImmutableMap.of(0, 1L, 1, 2L, 2, 3L));
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), 
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints))
+        .times(1);
 
     replayAll();
 
@@ -1404,10 +1431,12 @@ public void 
testKillUnresponsiveTasksWhileGettingStartTime() throws Exception
     checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L));
     TreeMap<Integer, Map<Integer, Long>> checkpoints2 = new TreeMap<>();
     checkpoints2.put(0, ImmutableMap.of(1, 0L));
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), 
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1))
-                                                                               
         .times(2);
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), 
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2))
-                                                                               
         .times(2);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints1))
+        .times(2);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints2))
+        .times(2);
 
     expect(taskStorage.getActiveTasks()).andReturn(tasks).anyTimes();
     for (Task task : tasks) {
@@ -1463,10 +1492,12 @@ public void testKillUnresponsiveTasksWhilePausing() 
throws Exception
     checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L));
     TreeMap<Integer, Map<Integer, Long>> checkpoints2 = new TreeMap<>();
     checkpoints2.put(0, ImmutableMap.of(1, 0L));
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), 
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1))
-                                                                               
         .times(2);
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), 
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2))
-                                                                               
         .times(2);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints1))
+        .times(2);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints2))
+        .times(2);
 
     captured = Capture.newInstance(CaptureType.ALL);
     expect(taskStorage.getActiveTasks()).andReturn(tasks).anyTimes();
@@ -1540,10 +1571,12 @@ public void 
testKillUnresponsiveTasksWhileSettingEndOffsets() throws Exception
     checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L));
     TreeMap<Integer, Map<Integer, Long>> checkpoints2 = new TreeMap<>();
     checkpoints2.put(0, ImmutableMap.of(1, 0L));
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), 
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1))
-                                                                               
         .times(2);
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), 
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2))
-                                                                               
         .times(2);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints1))
+        .times(2);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints2))
+        .times(2);
 
     captured = Capture.newInstance(CaptureType.ALL);
     expect(taskStorage.getActiveTasks()).andReturn(tasks).anyTimes();
@@ -1622,7 +1655,7 @@ public void testStopGracefully() throws Exception
     Task id1 = createKafkaIndexTask(
         "id1",
         DATASOURCE,
-        "sequenceName-0",
+        0,
         new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, 
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
         null,
@@ -1632,7 +1665,7 @@ public void testStopGracefully() throws Exception
     Task id2 = createKafkaIndexTask(
         "id2",
         DATASOURCE,
-        "sequenceName-0",
+        0,
         new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, 
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
         null,
@@ -1642,7 +1675,7 @@ public void testStopGracefully() throws Exception
     Task id3 = createKafkaIndexTask(
         "id3",
         DATASOURCE,
-        "sequenceName-0",
+        0,
         new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, 
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
         null,
@@ -1678,8 +1711,12 @@ public void testStopGracefully() throws Exception
     // getCheckpoints will not be called for id1 as it is in publishing state
     TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
     checkpoints.put(0, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L));
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), 
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), 
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints))
+        .times(1);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints))
+        .times(1);
 
     taskRunner.registerListener(anyObject(TaskRunnerListener.class), 
anyObject(Executor.class));
     replayAll();
@@ -1824,7 +1861,7 @@ public void testResetRunningTasks() throws Exception
     Task id1 = createKafkaIndexTask(
         "id1",
         DATASOURCE,
-        "sequenceName-0",
+        0,
         new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, 
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
         null,
@@ -1834,7 +1871,7 @@ public void testResetRunningTasks() throws Exception
     Task id2 = createKafkaIndexTask(
         "id2",
         DATASOURCE,
-        "sequenceName-0",
+        0,
         new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, 
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
         null,
@@ -1844,7 +1881,7 @@ public void testResetRunningTasks() throws Exception
     Task id3 = createKafkaIndexTask(
         "id3",
         DATASOURCE,
-        "sequenceName-0",
+        0,
         new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, 
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
         null,
@@ -1879,8 +1916,12 @@ public void testResetRunningTasks() throws Exception
 
     TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
     checkpoints.put(0, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L));
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), 
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), 
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints))
+        .times(1);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints))
+        .times(1);
 
     taskRunner.registerListener(anyObject(TaskRunnerListener.class), 
anyObject(Executor.class));
     replayAll();
@@ -1908,7 +1949,7 @@ public void testNoDataIngestionTasks() throws Exception
     Task id1 = createKafkaIndexTask(
         "id1",
         DATASOURCE,
-        "sequenceName-0",
+        0,
         new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, 
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
         null,
@@ -1918,7 +1959,7 @@ public void testNoDataIngestionTasks() throws Exception
     Task id2 = createKafkaIndexTask(
         "id2",
         DATASOURCE,
-        "sequenceName-0",
+        0,
         new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, 
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
         null,
@@ -1928,7 +1969,7 @@ public void testNoDataIngestionTasks() throws Exception
     Task id3 = createKafkaIndexTask(
         "id3",
         DATASOURCE,
-        "sequenceName-0",
+        0,
         new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, 
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
         null,
@@ -1958,9 +1999,15 @@ public void testNoDataIngestionTasks() throws Exception
 
     TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
     checkpoints.put(0, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L));
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), 
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), 
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), 
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints))
+        .times(1);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints))
+        .times(1);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints))
+        .times(1);
 
     taskRunner.registerListener(anyObject(TaskRunnerListener.class), 
anyObject(Executor.class));
     replayAll();
@@ -1980,6 +2027,172 @@ public void testNoDataIngestionTasks() throws Exception
     verifyAll();
   }
 
+  @Test(timeout = 60_000L)
+  public void testCheckpointForInactiveTaskGroup()
+      throws InterruptedException, ExecutionException, TimeoutException, 
JsonProcessingException
+  {
+    supervisor = getSupervisor(2, 1, true, "PT1S", null, null, false);
+    //not adding any events
+    final Task id1 = createKafkaIndexTask(
+        "id1",
+        DATASOURCE,
+        0,
+        new KafkaPartitions(topic, ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
+        new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE, 1, 
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
+        null,
+        null
+    );
+
+    final Task id2 = createKafkaIndexTask(
+        "id2",
+        DATASOURCE,
+        0,
+        new KafkaPartitions(topic, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
+        new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE, 1, 
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
+        null,
+        null
+    );
+
+    final Task id3 = createKafkaIndexTask(
+        "id3",
+        DATASOURCE,
+        0,
+        new KafkaPartitions(topic, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
+        new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE, 1, 
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
+        null,
+        null
+    );
+
+    
expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+    
expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+    expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(id1, id2, 
id3)).anyTimes();
+    
expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
+    
expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
+    
expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes();
+    expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
+    expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
+    expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes();
+    expect(
+        
indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(new
 KafkaDataSourceMetadata(null)
+    ).anyTimes();
+    
expect(taskClient.getStatusAsync("id1")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING));
+    
expect(taskClient.getStatusAsync("id2")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING));
+    
expect(taskClient.getStatusAsync("id3")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING));
+
+    final DateTime startTime = DateTimes.nowUtc();
+    
expect(taskClient.getStartTimeAsync("id1")).andReturn(Futures.immediateFuture(startTime));
+    
expect(taskClient.getStartTimeAsync("id2")).andReturn(Futures.immediateFuture(startTime));
+    
expect(taskClient.getStartTimeAsync("id3")).andReturn(Futures.immediateFuture(startTime));
+
+    final TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
+    checkpoints.put(0, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L));
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints))
+        .times(1);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints))
+        .times(1);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints))
+        .times(1);
+
+    taskRunner.registerListener(anyObject(TaskRunnerListener.class), 
anyObject(Executor.class));
+    replayAll();
+
+    supervisor.start();
+    supervisor.runInternal();
+
+    final Map<Integer, Long> fakeCheckpoints = Collections.emptyMap();
+    supervisor.moveTaskGroupToPendingCompletion(0);
+    supervisor.checkpoint(
+        0,
+        new KafkaDataSourceMetadata(new KafkaPartitions(topic, 
checkpoints.get(0))),
+        new KafkaDataSourceMetadata(new KafkaPartitions(topic, 
fakeCheckpoints))
+    );
+
+    while (supervisor.getNoticesQueueSize() > 0) {
+      Thread.sleep(100);
+    }
+
+    verifyAll();
+
+    Assert.assertNull(serviceEmitter.getStackTrace());
+    Assert.assertNull(serviceEmitter.getExceptionMessage());
+    Assert.assertNull(serviceEmitter.getExceptionClass());
+  }
+
+  @Test(timeout = 60_000L)
+  public void testCheckpointForUnknownTaskGroup() throws InterruptedException
+  {
+    supervisor = getSupervisor(2, 1, true, "PT1S", null, null, false);
+    //not adding any events
+    final Task id1 = createKafkaIndexTask(
+        "id1",
+        DATASOURCE,
+        0,
+        new KafkaPartitions(topic, ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
+        new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE, 1, 
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
+        null,
+        null
+    );
+
+    final Task id2 = createKafkaIndexTask(
+        "id2",
+        DATASOURCE,
+        0,
+        new KafkaPartitions(topic, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
+        new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE, 1, 
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
+        null,
+        null
+    );
+
+    final Task id3 = createKafkaIndexTask(
+        "id3",
+        DATASOURCE,
+        0,
+        new KafkaPartitions(topic, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
+        new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE, 1, 
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
+        null,
+        null
+    );
+
+    
expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+    
expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+    expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(id1, id2, 
id3)).anyTimes();
+    
expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
+    
expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
+    
expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes();
+    expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
+    expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
+    expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes();
+    expect(
+        
indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(new
 KafkaDataSourceMetadata(null)
+    ).anyTimes();
+
+    replayAll();
+
+    supervisor.start();
+
+    supervisor.checkpoint(
+        0,
+        new KafkaDataSourceMetadata(new KafkaPartitions(topic, 
Collections.emptyMap())),
+        new KafkaDataSourceMetadata(new KafkaPartitions(topic, 
Collections.emptyMap()))
+    );
+
+    while (supervisor.getNoticesQueueSize() > 0) {
+      Thread.sleep(100);
+    }
+
+    verifyAll();
+
+    Assert.assertNotNull(serviceEmitter.getStackTrace());
+    Assert.assertEquals(
+        "WTH?! cannot find taskGroup [0] among all taskGroups [{}]",
+        serviceEmitter.getExceptionMessage()
+    );
+    Assert.assertEquals(ISE.class, serviceEmitter.getExceptionClass());
+  }
+
   private void addSomeEvents(int numEventsPerPartition) throws Exception
   {
     try (final KafkaProducer<byte[], byte[]> kafkaProducer = 
kafkaServer.newProducer()) {
@@ -2106,7 +2319,7 @@ private static DataSchema getDataSchema(String dataSource)
   private KafkaIndexTask createKafkaIndexTask(
       String id,
       String dataSource,
-      String sequenceName,
+      int taskGroupId,
       KafkaPartitions startPartitions,
       KafkaPartitions endPartitions,
       DateTime minimumMessageTime,
@@ -2119,7 +2332,8 @@ private KafkaIndexTask createKafkaIndexTask(
         getDataSchema(dataSource),
         tuningConfig,
         new KafkaIOConfig(
-            sequenceName,
+            taskGroupId,
+            "sequenceName-" + taskGroupId,
             startPartitions,
             endPartitions,
             ImmutableMap.<String, String>of(),
@@ -2128,7 +2342,7 @@ private KafkaIndexTask createKafkaIndexTask(
             maximumMessageTime,
             false
         ),
-        ImmutableMap.<String, Object>of(),
+        Collections.emptyMap(),
         null,
         null,
         rowIngestionMetersFactory
diff --git 
a/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java
 
b/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java
index 8265f87c7b9..f1d11deb4ea 100644
--- 
a/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java
+++ 
b/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java
@@ -21,27 +21,28 @@
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.base.Preconditions;
 import io.druid.indexing.common.task.Task;
 import io.druid.indexing.overlord.DataSourceMetadata;
 
 public class CheckPointDataSourceMetadataAction implements TaskAction<Boolean>
 {
   private final String supervisorId;
-  private final String sequenceName;
+  private final int taskGroupId;
   private final DataSourceMetadata previousCheckPoint;
   private final DataSourceMetadata currentCheckPoint;
 
   public CheckPointDataSourceMetadataAction(
       @JsonProperty("supervisorId") String supervisorId,
-      @JsonProperty("sequenceName") String sequenceName,
+      @JsonProperty("taskGroupId") Integer taskGroupId,
       @JsonProperty("previousCheckPoint") DataSourceMetadata 
previousCheckPoint,
       @JsonProperty("currentCheckPoint") DataSourceMetadata currentCheckPoint
   )
   {
-    this.supervisorId = supervisorId;
-    this.sequenceName = sequenceName;
-    this.previousCheckPoint = previousCheckPoint;
-    this.currentCheckPoint = currentCheckPoint;
+    this.supervisorId = Preconditions.checkNotNull(supervisorId, 
"supervisorId");
+    this.taskGroupId = Preconditions.checkNotNull(taskGroupId, "taskGroupId");
+    this.previousCheckPoint = Preconditions.checkNotNull(previousCheckPoint, 
"previousCheckPoint");
+    this.currentCheckPoint = Preconditions.checkNotNull(currentCheckPoint, 
"currentCheckPoint");
   }
 
   @JsonProperty
@@ -51,9 +52,9 @@ public String getSupervisorId()
   }
 
   @JsonProperty
-  public String getSequenceName()
+  public int getTaskGroupId()
   {
-    return sequenceName;
+    return taskGroupId;
   }
 
   @JsonProperty
@@ -81,8 +82,12 @@ public Boolean perform(
       Task task, TaskActionToolbox toolbox
   )
   {
-    return toolbox.getSupervisorManager()
-                  .checkPointDataSourceMetadata(supervisorId, sequenceName, 
previousCheckPoint, currentCheckPoint);
+    return toolbox.getSupervisorManager().checkPointDataSourceMetadata(
+        supervisorId,
+        taskGroupId,
+        previousCheckPoint,
+        currentCheckPoint
+    );
   }
 
   @Override
@@ -96,7 +101,7 @@ public String toString()
   {
     return "CheckPointDataSourceMetadataAction{" +
            "supervisorId='" + supervisorId + '\'' +
-           ", sequenceName='" + sequenceName + '\'' +
+           ", taskGroupId='" + taskGroupId + '\'' +
            ", previousCheckPoint=" + previousCheckPoint +
            ", currentCheckPoint=" + currentCheckPoint +
            '}';
diff --git 
a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java
 
b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java
index dcdd014c95c..f9a55644432 100644
--- 
a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java
+++ 
b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java
@@ -165,9 +165,9 @@ public boolean resetSupervisor(String id, @Nullable 
DataSourceMetadata dataSourc
 
   public boolean checkPointDataSourceMetadata(
       String supervisorId,
-      @Nullable String sequenceName,
-      @Nullable DataSourceMetadata previousDataSourceMetadata,
-      @Nullable DataSourceMetadata currentDataSourceMetadata
+      int taskGroupId,
+      DataSourceMetadata previousDataSourceMetadata,
+      DataSourceMetadata currentDataSourceMetadata
   )
   {
     try {
@@ -178,7 +178,7 @@ public boolean checkPointDataSourceMetadata(
 
       Preconditions.checkNotNull(supervisor, "supervisor could not be found");
 
-      supervisor.lhs.checkpoint(sequenceName, previousDataSourceMetadata, 
currentDataSourceMetadata);
+      supervisor.lhs.checkpoint(taskGroupId, previousDataSourceMetadata, 
currentDataSourceMetadata);
       return true;
     }
     catch (Exception e) {
diff --git 
a/java-util/src/main/java/io/druid/java/util/emitter/EmittingLogger.java 
b/java-util/src/main/java/io/druid/java/util/emitter/EmittingLogger.java
index 0020cf1c79f..661ed17d3b9 100644
--- a/java-util/src/main/java/io/druid/java/util/emitter/EmittingLogger.java
+++ b/java-util/src/main/java/io/druid/java/util/emitter/EmittingLogger.java
@@ -35,6 +35,10 @@
  */
 public class EmittingLogger extends Logger
 {
+  public static final String EXCEPTION_TYPE_KEY = "exceptionType";
+  public static final String EXCEPTION_MESSAGE_KEY = "exceptionMessage";
+  public static final String EXCEPTION_STACK_TRACE_KEY = "exceptionStackTrace";
+
   private static volatile ServiceEmitter emitter = null;
 
   private final String className;
diff --git 
a/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
 
b/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
index 26ed99cd543..0408104cde8 100644
--- 
a/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
+++ 
b/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
@@ -83,9 +83,9 @@ public void reset(DataSourceMetadata dataSourceMetadata) {}
 
       @Override
       public void checkpoint(
-          @Nullable String sequenceName,
-          @Nullable DataSourceMetadata previousCheckPoint,
-          @Nullable DataSourceMetadata currentCheckPoint
+          int taskGroupId,
+          DataSourceMetadata previousCheckPoint,
+          DataSourceMetadata currentCheckPoint
       )
       {
 
diff --git 
a/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java 
b/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java
index 5afe9122991..04afac7aea6 100644
--- a/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java
+++ b/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java
@@ -22,7 +22,6 @@
 import com.google.common.collect.ImmutableMap;
 import io.druid.indexing.overlord.DataSourceMetadata;
 
-import javax.annotation.Nullable;
 import java.util.Map;
 
 public interface Supervisor
@@ -52,13 +51,9 @@
    * for example - Kafka Supervisor uses this to merge and handoff segments 
containing at least the data
    * represented by {@param currentCheckpoint} DataSourceMetadata
    *
-   * @param sequenceName       unique Identifier to figure out for which 
sequence to do checkpointing
+   * @param taskGroupId        unique Identifier to figure out for which 
sequence to do checkpointing
    * @param previousCheckPoint DataSourceMetadata checkpointed in previous call
    * @param currentCheckPoint  current DataSourceMetadata to be checkpointed
    */
-  void checkpoint(
-      @Nullable String sequenceName,
-      @Nullable DataSourceMetadata previousCheckPoint,
-      @Nullable DataSourceMetadata currentCheckPoint
-  );
+  void checkpoint(int taskGroupId, DataSourceMetadata previousCheckPoint, 
DataSourceMetadata currentCheckPoint);
 }
diff --git 
a/server/src/test/java/io/druid/server/metrics/ExceptionCapturingServiceEmitter.java
 
b/server/src/test/java/io/druid/server/metrics/ExceptionCapturingServiceEmitter.java
new file mode 100644
index 00000000000..cc217c6f5b0
--- /dev/null
+++ 
b/server/src/test/java/io/druid/server/metrics/ExceptionCapturingServiceEmitter.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package io.druid.server.metrics;
+
+import io.druid.java.util.emitter.EmittingLogger;
+import io.druid.java.util.emitter.core.Event;
+import io.druid.java.util.emitter.service.ServiceEmitter;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+
+public class ExceptionCapturingServiceEmitter extends ServiceEmitter
+{
+  private volatile Class exceptionClass;
+  private volatile String exceptionMessage;
+  private volatile String stackTrace;
+
+  public ExceptionCapturingServiceEmitter()
+  {
+    super("", "", null);
+  }
+
+  @Override
+  public void emit(Event event)
+  {
+    //noinspection unchecked
+    final Map<String, Object> dataMap = (Map<String, Object>) 
event.toMap().get("data");
+    final Class exceptionClass = (Class) 
dataMap.get(EmittingLogger.EXCEPTION_TYPE_KEY);
+    if (exceptionClass != null) {
+      final String exceptionMessage = (String) 
dataMap.get(EmittingLogger.EXCEPTION_MESSAGE_KEY);
+      final String stackTrace = (String) 
dataMap.get(EmittingLogger.EXCEPTION_STACK_TRACE_KEY);
+      this.exceptionClass = exceptionClass;
+      this.exceptionMessage = exceptionMessage;
+      this.stackTrace = stackTrace;
+    }
+  }
+
+  @Nullable
+  public Class getExceptionClass()
+  {
+    return exceptionClass;
+  }
+
+  @Nullable
+  public String getExceptionMessage()
+  {
+    return exceptionMessage;
+  }
+
+  @Nullable
+  public String getStackTrace()
+  {
+    return stackTrace;
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to