This is an automated email from the ASF dual-hosted git repository.

jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new c48aa74  Fix NPE while handling CheckpointNotice in KafkaSupervisor 
(#5996)
c48aa74 is described below

commit c48aa74a301a11f49b0d6ba6bde4283bdab7f699
Author: Jihoon Son <[email protected]>
AuthorDate: Fri Jul 13 17:14:57 2018 -0700

    Fix NPE while handling CheckpointNotice in KafkaSupervisor (#5996)
    
    * Fix NPE while handling CheckpointNotice
    
    * fix code style
    
    * Fix test
    
    * fix test
    
    * add a log for creating a new taskGroup
    
    * fix backward compatibility in KafkaIOConfig
---
 .../MaterializedViewSupervisor.java                |   7 +-
 .../IncrementalPublishingKafkaIndexTaskRunner.java |   7 +-
 .../io/druid/indexing/kafka/KafkaIOConfig.java     |  15 +-
 .../indexing/kafka/supervisor/KafkaSupervisor.java | 185 ++++++-----
 .../io/druid/indexing/kafka/KafkaIOConfigTest.java |   9 +
 .../druid/indexing/kafka/KafkaIndexTaskTest.java   | 106 ++++---
 .../kafka/supervisor/KafkaSupervisorTest.java      | 344 +++++++++++++++++----
 .../CheckPointDataSourceMetadataAction.java        |  27 +-
 .../overlord/supervisor/SupervisorManager.java     |   8 +-
 .../io/druid/java/util/emitter/EmittingLogger.java |   4 +
 .../overlord/supervisor/NoopSupervisorSpec.java    |   6 +-
 .../indexing/overlord/supervisor/Supervisor.java   |   9 +-
 .../metrics/ExceptionCapturingServiceEmitter.java  |  71 +++++
 13 files changed, 579 insertions(+), 219 deletions(-)

diff --git 
a/extensions-contrib/materialized-view-maintenance/src/main/java/io/druid/indexing/materializedview/MaterializedViewSupervisor.java
 
b/extensions-contrib/materialized-view-maintenance/src/main/java/io/druid/indexing/materializedview/MaterializedViewSupervisor.java
index eba35a4..fedda09 100644
--- 
a/extensions-contrib/materialized-view-maintenance/src/main/java/io/druid/indexing/materializedview/MaterializedViewSupervisor.java
+++ 
b/extensions-contrib/materialized-view-maintenance/src/main/java/io/druid/indexing/materializedview/MaterializedViewSupervisor.java
@@ -53,7 +53,6 @@ import io.druid.timeline.DataSegment;
 import org.joda.time.Duration;
 import org.joda.time.Interval;
 
-import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
@@ -240,11 +239,7 @@ public class MaterializedViewSupervisor implements 
Supervisor
   }
 
   @Override
-  public void checkpoint(
-      @Nullable String sequenceName,
-      @Nullable DataSourceMetadata previousCheckPoint,
-      @Nullable DataSourceMetadata currentCheckPoint
-  )
+  public void checkpoint(int taskGroupId, DataSourceMetadata 
previousCheckPoint, DataSourceMetadata currentCheckPoint)
   {
     // do nothing
   }
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
 
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
index 3a440b1..a93fde6 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
@@ -600,12 +600,13 @@ public class IncrementalPublishingKafkaIndexTaskRunner 
implements KafkaIndexTask
                 sequences
             );
             requestPause();
-            if (!toolbox.getTaskActionClient().submit(new 
CheckPointDataSourceMetadataAction(
+            final CheckPointDataSourceMetadataAction checkpointAction = new 
CheckPointDataSourceMetadataAction(
                 task.getDataSource(),
-                ioConfig.getBaseSequenceName(),
+                ioConfig.getTaskGroupId(),
                 new KafkaDataSourceMetadata(new KafkaPartitions(topic, 
sequenceToCheckpoint.getStartOffsets())),
                 new KafkaDataSourceMetadata(new KafkaPartitions(topic, 
nextOffsets))
-            ))) {
+            );
+            if (!toolbox.getTaskActionClient().submit(checkpointAction)) {
               throw new ISE("Checkpoint request with offsets [%s] failed, 
dying", nextOffsets);
             }
           }
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java
 
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java
index 4dd3aaf..b6c1d76 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java
@@ -26,6 +26,7 @@ import com.google.common.base.Preconditions;
 import io.druid.segment.indexing.IOConfig;
 import org.joda.time.DateTime;
 
+import javax.annotation.Nullable;
 import java.util.Map;
 
 public class KafkaIOConfig implements IOConfig
@@ -33,6 +34,8 @@ public class KafkaIOConfig implements IOConfig
   private static final boolean DEFAULT_USE_TRANSACTION = true;
   private static final boolean DEFAULT_SKIP_OFFSET_GAPS = false;
 
+  @Nullable
+  private final Integer taskGroupId;
   private final String baseSequenceName;
   private final KafkaPartitions startPartitions;
   private final KafkaPartitions endPartitions;
@@ -44,6 +47,7 @@ public class KafkaIOConfig implements IOConfig
 
   @JsonCreator
   public KafkaIOConfig(
+      @JsonProperty("taskGroupId") @Nullable Integer taskGroupId, // can be 
null for backward compabitility
       @JsonProperty("baseSequenceName") String baseSequenceName,
       @JsonProperty("startPartitions") KafkaPartitions startPartitions,
       @JsonProperty("endPartitions") KafkaPartitions endPartitions,
@@ -54,6 +58,7 @@ public class KafkaIOConfig implements IOConfig
       @JsonProperty("skipOffsetGaps") Boolean skipOffsetGaps
   )
   {
+    this.taskGroupId = taskGroupId;
     this.baseSequenceName = Preconditions.checkNotNull(baseSequenceName, 
"baseSequenceName");
     this.startPartitions = Preconditions.checkNotNull(startPartitions, 
"startPartitions");
     this.endPartitions = Preconditions.checkNotNull(endPartitions, 
"endPartitions");
@@ -83,6 +88,13 @@ public class KafkaIOConfig implements IOConfig
     }
   }
 
+  @Nullable
+  @JsonProperty
+  public Integer getTaskGroupId()
+  {
+    return taskGroupId;
+  }
+
   @JsonProperty
   public String getBaseSequenceName()
   {
@@ -135,7 +147,8 @@ public class KafkaIOConfig implements IOConfig
   public String toString()
   {
     return "KafkaIOConfig{" +
-           "baseSequenceName='" + baseSequenceName + '\'' +
+           "taskGroupId=" + taskGroupId +
+           ", baseSequenceName='" + baseSequenceName + '\'' +
            ", startPartitions=" + startPartitions +
            ", endPartitions=" + endPartitions +
            ", consumerProperties=" + consumerProperties +
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
 
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index 8ea13ef..ed287fa 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -143,8 +143,7 @@ public class KafkaSupervisor implements Supervisor
    * time, there should only be up to a maximum of [taskCount] 
actively-reading task groups (tracked in the [taskGroups]
    * map) + zero or more pending-completion task groups (tracked in 
[pendingCompletionTaskGroups]).
    */
-  @VisibleForTesting
-  static class TaskGroup
+  private static class TaskGroup
   {
     // This specifies the partitions and starting offsets for this task group. 
It is set on group creation from the data
     // in [partitionGroups] and never changes during the lifetime of this task 
group, which will live until a task in
@@ -159,7 +158,7 @@ public class KafkaSupervisor implements Supervisor
     DateTime completionTimeout; // is set after signalTasksToFinish(); if not 
done by timeout, take corrective action
     final TreeMap<Integer, Map<Integer, Long>> sequenceOffsets = new 
TreeMap<>();
 
-    public TaskGroup(
+    TaskGroup(
         ImmutableMap<Integer, Long> partitionOffsets,
         Optional<DateTime> minimumMessageTime,
         Optional<DateTime> maximumMessageTime
@@ -171,7 +170,7 @@ public class KafkaSupervisor implements Supervisor
       this.sequenceOffsets.put(0, partitionOffsets);
     }
 
-    public int addNewCheckpoint(Map<Integer, Long> checkpoint)
+    int addNewCheckpoint(Map<Integer, Long> checkpoint)
     {
       sequenceOffsets.put(sequenceOffsets.lastKey() + 1, checkpoint);
       return sequenceOffsets.lastKey();
@@ -212,9 +211,6 @@ public class KafkaSupervisor implements Supervisor
   private final ConcurrentHashMap<Integer, ConcurrentHashMap<Integer, Long>> 
partitionGroups = new ConcurrentHashMap<>();
   // --------------------------------------------------------
 
-  // BaseSequenceName -> TaskGroup
-  private final ConcurrentHashMap<String, TaskGroup> sequenceTaskGroup = new 
ConcurrentHashMap<>();
-
   private final TaskStorage taskStorage;
   private final TaskMaster taskMaster;
   private final IndexerMetadataStorageCoordinator 
indexerMetadataStorageCoordinator;
@@ -513,13 +509,9 @@ public class KafkaSupervisor implements Supervisor
   }
 
   @Override
-  public void checkpoint(
-      String sequenceName,
-      DataSourceMetadata previousCheckpoint,
-      DataSourceMetadata currentCheckpoint
-  )
+  public void checkpoint(int taskGroupId, DataSourceMetadata 
previousCheckpoint, DataSourceMetadata currentCheckpoint)
   {
-    Preconditions.checkNotNull(sequenceName, "Cannot checkpoint without a 
sequence name");
+    Preconditions.checkNotNull(previousCheckpoint, "previousCheckpoint");
     Preconditions.checkNotNull(currentCheckpoint, "current checkpoint cannot 
be null");
     Preconditions.checkArgument(
         ioConfig.getTopic()
@@ -530,12 +522,14 @@ public class KafkaSupervisor implements Supervisor
         ((KafkaDataSourceMetadata) 
currentCheckpoint).getKafkaPartitions().getTopic()
     );
 
-    log.info("Checkpointing [%s] for sequence [%s]", currentCheckpoint, 
sequenceName);
-    notices.add(new CheckpointNotice(
-        sequenceName,
-        (KafkaDataSourceMetadata) previousCheckpoint,
-        (KafkaDataSourceMetadata) currentCheckpoint
-    ));
+    log.info("Checkpointing [%s] for taskGroup [%s]", currentCheckpoint, 
taskGroupId);
+    notices.add(
+        new CheckpointNotice(
+            taskGroupId,
+            (KafkaDataSourceMetadata) previousCheckpoint,
+            (KafkaDataSourceMetadata) currentCheckpoint
+        )
+    );
   }
 
   public void possiblyRegisterListener()
@@ -637,17 +631,17 @@ public class KafkaSupervisor implements Supervisor
 
   private class CheckpointNotice implements Notice
   {
-    final String sequenceName;
+    final int taskGroupId;
     final KafkaDataSourceMetadata previousCheckpoint;
     final KafkaDataSourceMetadata currentCheckpoint;
 
     CheckpointNotice(
-        String sequenceName,
+        int taskGroupId,
         KafkaDataSourceMetadata previousCheckpoint,
         KafkaDataSourceMetadata currentCheckpoint
     )
     {
-      this.sequenceName = sequenceName;
+      this.taskGroupId = taskGroupId;
       this.previousCheckpoint = previousCheckpoint;
       this.currentCheckpoint = currentCheckpoint;
     }
@@ -658,17 +652,12 @@ public class KafkaSupervisor implements Supervisor
       // check for consistency
       // if already received request for this sequenceName and 
dataSourceMetadata combination then return
 
-      Preconditions.checkNotNull(
-          sequenceTaskGroup.get(sequenceName),
-          "WTH?! cannot find task group for this sequence [%s], 
sequencesTaskGroup map [%s], taskGroups [%s]",
-          sequenceName,
-          sequenceTaskGroup,
-          taskGroups
-      );
-      final TreeMap<Integer, Map<Integer, Long>> checkpoints = 
sequenceTaskGroup.get(sequenceName).sequenceOffsets;
+      final TaskGroup taskGroup = taskGroups.get(taskGroupId);
+
+      if (isValidTaskGroup(taskGroup)) {
+        final TreeMap<Integer, Map<Integer, Long>> checkpoints = 
taskGroup.sequenceOffsets;
 
-      // check validity of previousCheckpoint if it is not null
-      if (previousCheckpoint != null) {
+        // check validity of previousCheckpoint
         int index = checkpoints.size();
         for (int sequenceId : checkpoints.descendingKeySet()) {
           Map<Integer, Long> checkpoint = checkpoints.get(sequenceId);
@@ -685,26 +674,39 @@ public class KafkaSupervisor implements Supervisor
           log.info("Already checkpointed with offsets [%s]", 
checkpoints.lastEntry().getValue());
           return;
         }
-      } else {
-        // There cannot be more than one checkpoint when previous checkpoint 
is null
-        // as when the task starts they are sent existing checkpoints
-        Preconditions.checkState(
-            checkpoints.size() <= 1,
-            "Got checkpoint request with null as previous check point, however 
found more than one checkpoints"
+        final int taskGroupId = getTaskGroupIdForPartition(
+            currentCheckpoint.getKafkaPartitions()
+                             .getPartitionOffsetMap()
+                             .keySet()
+                             .iterator()
+                             .next()
         );
-        if (checkpoints.size() == 1) {
-          log.info("Already checkpointed with dataSourceMetadata [%s]", 
checkpoints.get(0));
-          return;
+        final Map<Integer, Long> newCheckpoint = 
checkpointTaskGroup(taskGroupId, false).get();
+        taskGroups.get(taskGroupId).addNewCheckpoint(newCheckpoint);
+        log.info("Handled checkpoint notice, new checkpoint is [%s] for 
taskGroup [%s]", newCheckpoint, taskGroupId);
+      }
+    }
+
+    private boolean isValidTaskGroup(@Nullable TaskGroup taskGroup)
+    {
+      if (taskGroup == null) {
+        // taskGroup might be in pendingCompletionTaskGroups or partitionGroups
+        if (pendingCompletionTaskGroups.containsKey(taskGroupId)) {
+          log.warn(
+              "Ignoring checkpoint request because taskGroup[%d] has already 
stopped indexing and is waiting for "
+              + "publishing segments",
+              taskGroupId
+          );
+          return false;
+        } else if (partitionGroups.containsKey(taskGroupId)) {
+          log.warn("Ignoring checkpoint request because taskGroup[%d] is 
inactive", taskGroupId);
+          return false;
+        } else {
+          throw new ISE("WTH?! cannot find taskGroup [%s] among all taskGroups 
[%s]", taskGroupId, taskGroups);
         }
       }
-      final int taskGroupId = 
getTaskGroupIdForPartition(currentCheckpoint.getKafkaPartitions()
-                                                                          
.getPartitionOffsetMap()
-                                                                          
.keySet()
-                                                                          
.iterator()
-                                                                          
.next());
-      final Map<Integer, Long> newCheckpoint = 
checkpointTaskGroup(taskGroupId, false).get();
-      sequenceTaskGroup.get(sequenceName).addNewCheckpoint(newCheckpoint);
-      log.info("Handled checkpoint notice, new checkpoint is [%s] for sequence 
[%s]", newCheckpoint, sequenceName);
+
+      return true;
     }
   }
 
@@ -718,7 +720,6 @@ public class KafkaSupervisor implements Supervisor
       taskGroups.values().forEach(this::killTasksInGroup);
       taskGroups.clear();
       partitionGroups.clear();
-      sequenceTaskGroup.clear();
     } else if (!(dataSourceMetadata instanceof KafkaDataSourceMetadata)) {
       throw new IAE("Expected KafkaDataSourceMetadata but found instance of 
[%s]", dataSourceMetadata.getClass());
     } else {
@@ -778,8 +779,7 @@ public class KafkaSupervisor implements Supervisor
           
resetKafkaMetadata.getKafkaPartitions().getPartitionOffsetMap().keySet().forEach(partition
 -> {
             final int groupId = getTaskGroupIdForPartition(partition);
             killTaskGroupForPartitions(ImmutableSet.of(partition));
-            final TaskGroup removedGroup = taskGroups.remove(groupId);
-            sequenceTaskGroup.remove(generateSequenceName(removedGroup));
+            taskGroups.remove(groupId);
             partitionGroups.get(groupId).replaceAll((partitionId, offset) -> 
NOT_SET);
           });
         } else {
@@ -955,9 +955,10 @@ public class KafkaSupervisor implements Supervisor
     for (int partition = 0; partition < numPartitions; partition++) {
       int taskGroupId = getTaskGroupIdForPartition(partition);
 
-      partitionGroups.putIfAbsent(taskGroupId, new ConcurrentHashMap<Integer, 
Long>());
-
-      ConcurrentHashMap<Integer, Long> partitionMap = 
partitionGroups.get(taskGroupId);
+      ConcurrentHashMap<Integer, Long> partitionMap = 
partitionGroups.computeIfAbsent(
+          taskGroupId,
+          k -> new ConcurrentHashMap<>()
+      );
 
       // The starting offset for a new partition in [partitionGroups] is 
initially set to NOT_SET; when a new task group
       // is created and is assigned partitions, if the offset in 
[partitionGroups] is NOT_SET it will take the starting
@@ -1087,23 +1088,21 @@ public class KafkaSupervisor implements Supervisor
                             }
                             return false;
                           } else {
-                            final TaskGroup taskGroup = new TaskGroup(
-                                ImmutableMap.copyOf(
-                                    kafkaTask.getIOConfig()
-                                             .getStartPartitions()
-                                             .getPartitionOffsetMap()
-                                ), 
kafkaTask.getIOConfig().getMinimumMessageTime(),
-                                kafkaTask.getIOConfig().getMaximumMessageTime()
-                            );
-                            if (taskGroups.putIfAbsent(
+                            final TaskGroup taskGroup = 
taskGroups.computeIfAbsent(
                                 taskGroupId,
-                                taskGroup
-                            ) == null) {
-                              
sequenceTaskGroup.put(generateSequenceName(taskGroup), 
taskGroups.get(taskGroupId));
-                              log.info("Created new task group [%d]", 
taskGroupId);
-                            }
+                                k -> {
+                                  log.info("Creating a new task group for 
taskGroupId[%d]", taskGroupId);
+                                  return new TaskGroup(
+                                      ImmutableMap.copyOf(
+                                          
kafkaTask.getIOConfig().getStartPartitions().getPartitionOffsetMap()
+                                      ),
+                                      
kafkaTask.getIOConfig().getMinimumMessageTime(),
+                                      
kafkaTask.getIOConfig().getMaximumMessageTime()
+                                  );
+                                }
+                            );
                             taskGroupsToVerify.add(taskGroupId);
-                            
taskGroups.get(taskGroupId).tasks.putIfAbsent(taskId, new TaskData());
+                            taskGroup.tasks.putIfAbsent(taskId, new 
TaskData());
                           }
                         }
                         return true;
@@ -1256,7 +1255,6 @@ public class KafkaSupervisor implements Supervisor
       // killing all tasks or no task left in the group ?
       // clear state about the taskgroup so that get latest offset information 
is fetched from metadata store
       log.warn("Clearing task group [%d] information as no valid tasks left 
the group", groupId);
-      sequenceTaskGroup.remove(generateSequenceName(taskGroup));
       taskGroups.remove(groupId);
       partitionGroups.get(groupId).replaceAll((partition, offset) -> NOT_SET);
     }
@@ -1281,9 +1279,10 @@ public class KafkaSupervisor implements Supervisor
       Map<Integer, Long> startingPartitions
   )
   {
-    pendingCompletionTaskGroups.putIfAbsent(groupId, 
Lists.<TaskGroup>newCopyOnWriteArrayList());
-
-    CopyOnWriteArrayList<TaskGroup> taskGroupList = 
pendingCompletionTaskGroups.get(groupId);
+    final CopyOnWriteArrayList<TaskGroup> taskGroupList = 
pendingCompletionTaskGroups.computeIfAbsent(
+        groupId,
+        k -> new CopyOnWriteArrayList<>()
+    );
     for (TaskGroup taskGroup : taskGroupList) {
       if (taskGroup.partitionOffsets.equals(startingPartitions)) {
         if (taskGroup.tasks.putIfAbsent(taskId, new TaskData()) == null) {
@@ -1411,8 +1410,7 @@ public class KafkaSupervisor implements Supervisor
       if (endOffsets != null) {
         // set a timeout and put this group in pendingCompletionTaskGroups so 
that it can be monitored for completion
         group.completionTimeout = 
DateTimes.nowUtc().plus(ioConfig.getCompletionTimeout());
-        pendingCompletionTaskGroups.putIfAbsent(groupId, 
Lists.<TaskGroup>newCopyOnWriteArrayList());
-        pendingCompletionTaskGroups.get(groupId).add(group);
+        pendingCompletionTaskGroups.computeIfAbsent(groupId, k -> new 
CopyOnWriteArrayList<>()).add(group);
 
         // set endOffsets as the next startOffsets
         for (Map.Entry<Integer, Long> entry : endOffsets.entrySet()) {
@@ -1432,7 +1430,6 @@ public class KafkaSupervisor implements Supervisor
         partitionGroups.get(groupId).replaceAll((partition, offset) -> 
NOT_SET);
       }
 
-      sequenceTaskGroup.remove(generateSequenceName(group));
       // remove this task group from the list of current task groups now that 
it has been handled
       taskGroups.remove(groupId);
     }
@@ -1456,7 +1453,8 @@ public class KafkaSupervisor implements Supervisor
           // metadata store (which will have advanced if we succeeded in 
publishing and will remain the same if publishing
           // failed and we need to re-ingest)
           return Futures.transform(
-              stopTasksInGroup(taskGroup), new Function<Object, Map<Integer, 
Long>>()
+              stopTasksInGroup(taskGroup),
+              new Function<Object, Map<Integer, Long>>()
               {
                 @Nullable
                 @Override
@@ -1625,15 +1623,15 @@ public class KafkaSupervisor implements Supervisor
             log.warn("All tasks in group [%d] failed to publish, killing all 
tasks for these partitions", groupId);
           } else {
             log.makeAlert(
-                "No task in [%s] succeeded before the completion timeout 
elapsed [%s]!",
+                "No task in [%s] for taskGroup [%d] succeeded before the 
completion timeout elapsed [%s]!",
                 group.taskIds(),
+                groupId,
                 ioConfig.getCompletionTimeout()
             ).emit();
           }
 
           // reset partitions offsets for this task group so that they will be 
re-read from metadata storage
           partitionGroups.get(groupId).replaceAll((partition, offset) -> 
NOT_SET);
-          sequenceTaskGroup.remove(generateSequenceName(group));
           // kill all the tasks in this pending completion group
           killTasksInGroup(group);
           // set a flag so the other pending completion groups for this set of 
partitions will also stop
@@ -1693,7 +1691,6 @@ public class KafkaSupervisor implements Supervisor
         // be recreated with the next set of offsets
         if (taskData.status.isSuccess()) {
           futures.add(stopTasksInGroup(taskGroup));
-          sequenceTaskGroup.remove(generateSequenceName(taskGroup));
           iTaskGroups.remove();
           break;
         }
@@ -1735,7 +1732,6 @@ public class KafkaSupervisor implements Supervisor
             groupId,
             taskGroup
         );
-        sequenceTaskGroup.put(generateSequenceName(taskGroup), 
taskGroups.get(groupId));
       }
     }
 
@@ -1778,6 +1774,7 @@ public class KafkaSupervisor implements Supervisor
     DateTime maximumMessageTime = 
taskGroups.get(groupId).maximumMessageTime.orNull();
 
     KafkaIOConfig kafkaIOConfig = new KafkaIOConfig(
+        groupId,
         sequenceName,
         new KafkaPartitions(ioConfig.getTopic(), startPartitions),
         new KafkaPartitions(ioConfig.getTopic(), endPartitions),
@@ -1944,7 +1941,7 @@ public class KafkaSupervisor implements Supervisor
     }
   }
 
-  private ListenableFuture<?> stopTasksInGroup(TaskGroup taskGroup)
+  private ListenableFuture<?> stopTasksInGroup(@Nullable TaskGroup taskGroup)
   {
     if (taskGroup == null) {
       return Futures.immediateFuture(null);
@@ -2289,6 +2286,28 @@ public class KafkaSupervisor implements Supervisor
     return allStats;
   }
 
+  @VisibleForTesting
+  @Nullable
+  TaskGroup removeTaskGroup(int taskGroupId)
+  {
+    return taskGroups.remove(taskGroupId);
+  }
+
+  @VisibleForTesting
+  void moveTaskGroupToPendingCompletion(int taskGroupId)
+  {
+    final TaskGroup taskGroup = taskGroups.remove(taskGroupId);
+    if (taskGroup != null) {
+      pendingCompletionTaskGroups.computeIfAbsent(taskGroupId, k -> new 
CopyOnWriteArrayList<>()).add(taskGroup);
+    }
+  }
+
+  @VisibleForTesting
+  int getNoticesQueueSize()
+  {
+    return notices.size();
+  }
+
   private static class StatsFromTaskResult
   {
     private final String groupId;
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java
index 3bc55e2..050dba7 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java
@@ -50,6 +50,7 @@ public class KafkaIOConfigTest
   {
     String jsonStr = "{\n"
                      + "  \"type\": \"kafka\",\n"
+                     + "  \"taskGroupId\": 0,\n"
                      + "  \"baseSequenceName\": \"my-sequence-name\",\n"
                      + "  \"startPartitions\": {\"topic\":\"mytopic\", 
\"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
                      + "  \"endPartitions\": {\"topic\":\"mytopic\", 
\"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
@@ -82,6 +83,7 @@ public class KafkaIOConfigTest
   {
     String jsonStr = "{\n"
                      + "  \"type\": \"kafka\",\n"
+                     + "  \"taskGroupId\": 0,\n"
                      + "  \"baseSequenceName\": \"my-sequence-name\",\n"
                      + "  \"startPartitions\": {\"topic\":\"mytopic\", 
\"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
                      + "  \"endPartitions\": {\"topic\":\"mytopic\", 
\"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
@@ -118,6 +120,7 @@ public class KafkaIOConfigTest
   {
     String jsonStr = "{\n"
                      + "  \"type\": \"kafka\",\n"
+                     + "  \"taskGroupId\": 0,\n"
                      + "  \"startPartitions\": {\"topic\":\"mytopic\", 
\"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
                      + "  \"endPartitions\": {\"topic\":\"mytopic\", 
\"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
                      + "  \"consumerProperties\": 
{\"bootstrap.servers\":\"localhost:9092\"},\n"
@@ -137,6 +140,7 @@ public class KafkaIOConfigTest
   {
     String jsonStr = "{\n"
                      + "  \"type\": \"kafka\",\n"
+                     + "  \"taskGroupId\": 0,\n"
                      + "  \"baseSequenceName\": \"my-sequence-name\",\n"
                      + "  \"endPartitions\": {\"topic\":\"mytopic\", 
\"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
                      + "  \"consumerProperties\": 
{\"bootstrap.servers\":\"localhost:9092\"},\n"
@@ -156,6 +160,7 @@ public class KafkaIOConfigTest
   {
     String jsonStr = "{\n"
                      + "  \"type\": \"kafka\",\n"
+                     + "  \"taskGroupId\": 0,\n"
                      + "  \"baseSequenceName\": \"my-sequence-name\",\n"
                      + "  \"startPartitions\": {\"topic\":\"mytopic\", 
\"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
                      + "  \"consumerProperties\": 
{\"bootstrap.servers\":\"localhost:9092\"},\n"
@@ -175,6 +180,7 @@ public class KafkaIOConfigTest
   {
     String jsonStr = "{\n"
                      + "  \"type\": \"kafka\",\n"
+                     + "  \"taskGroupId\": 0,\n"
                      + "  \"baseSequenceName\": \"my-sequence-name\",\n"
                      + "  \"startPartitions\": {\"topic\":\"mytopic\", 
\"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
                      + "  \"endPartitions\": {\"topic\":\"mytopic\", 
\"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
@@ -194,6 +200,7 @@ public class KafkaIOConfigTest
   {
     String jsonStr = "{\n"
                      + "  \"type\": \"kafka\",\n"
+                     + "  \"taskGroupId\": 0,\n"
                      + "  \"baseSequenceName\": \"my-sequence-name\",\n"
                      + "  \"startPartitions\": {\"topic\":\"mytopic\", 
\"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
                      + "  \"endPartitions\": {\"topic\":\"other\", 
\"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
@@ -214,6 +221,7 @@ public class KafkaIOConfigTest
   {
     String jsonStr = "{\n"
                      + "  \"type\": \"kafka\",\n"
+                     + "  \"taskGroupId\": 0,\n"
                      + "  \"baseSequenceName\": \"my-sequence-name\",\n"
                      + "  \"startPartitions\": {\"topic\":\"mytopic\", 
\"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
                      + "  \"endPartitions\": {\"topic\":\"mytopic\", 
\"partitionOffsetMap\" : {\"0\":15}},\n"
@@ -234,6 +242,7 @@ public class KafkaIOConfigTest
   {
     String jsonStr = "{\n"
                      + "  \"type\": \"kafka\",\n"
+                     + "  \"taskGroupId\": 0,\n"
                      + "  \"baseSequenceName\": \"my-sequence-name\",\n"
                      + "  \"startPartitions\": {\"topic\":\"mytopic\", 
\"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
                      + "  \"endPartitions\": {\"topic\":\"mytopic\", 
\"partitionOffsetMap\" : {\"0\":15, \"1\":2}},\n"
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
index 15f4bb0..411fff9 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -262,21 +262,21 @@ public class KafkaIndexTaskTest
   private static List<ProducerRecord<byte[], byte[]>> generateRecords(String 
topic)
   {
     return ImmutableList.of(
-        new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2008", "a", 
"y", "10", "20.0", "1.0")),
-        new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2009", "b", 
"y", "10", "20.0", "1.0")),
-        new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2010", "c", 
"y", "10", "20.0", "1.0")),
-        new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2011", "d", 
"y", "10", "20.0", "1.0")),
-        new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2011", "e", 
"y", "10", "20.0", "1.0")),
-        new ProducerRecord<byte[], byte[]>(topic, 0, null, 
JB("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")),
-        new ProducerRecord<byte[], byte[]>(topic, 0, null, 
StringUtils.toUtf8("unparseable")),
-        new ProducerRecord<byte[], byte[]>(topic, 0, null, 
StringUtils.toUtf8("unparseable2")),
-        new ProducerRecord<byte[], byte[]>(topic, 0, null, null),
-        new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2013", "f", 
"y", "10", "20.0", "1.0")),
-        new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2049", "f", 
"y", "notanumber", "20.0", "1.0")),
-        new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2049", "f", 
"y", "10", "notanumber", "1.0")),
-        new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2049", "f", 
"y", "10", "20.0", "notanumber")),
-        new ProducerRecord<byte[], byte[]>(topic, 1, null, JB("2012", "g", 
"y", "10", "20.0", "1.0")),
-        new ProducerRecord<byte[], byte[]>(topic, 1, null, JB("2011", "h", 
"y", "10", "20.0", "1.0"))
+        new ProducerRecord<>(topic, 0, null, JB("2008", "a", "y", "10", 
"20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, JB("2009", "b", "y", "10", 
"20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, JB("2010", "c", "y", "10", 
"20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, JB("2011", "d", "y", "10", 
"20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, JB("2011", "e", "y", "10", 
"20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, 
JB("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, 
StringUtils.toUtf8("unparseable")),
+        new ProducerRecord<>(topic, 0, null, 
StringUtils.toUtf8("unparseable2")),
+        new ProducerRecord<>(topic, 0, null, null),
+        new ProducerRecord<>(topic, 0, null, JB("2013", "f", "y", "10", 
"20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, JB("2049", "f", "y", 
"notanumber", "20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, JB("2049", "f", "y", "10", 
"notanumber", "1.0")),
+        new ProducerRecord<>(topic, 0, null, JB("2049", "f", "y", "10", 
"20.0", "notanumber")),
+        new ProducerRecord<>(topic, 1, null, JB("2012", "g", "y", "10", 
"20.0", "1.0")),
+        new ProducerRecord<>(topic, 1, null, JB("2011", "h", "y", "10", 
"20.0", "1.0"))
     );
   }
 
@@ -377,6 +377,7 @@ public class KafkaIndexTaskTest
     final KafkaIndexTask task = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -418,6 +419,7 @@ public class KafkaIndexTaskTest
     final KafkaIndexTask task = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -493,6 +495,7 @@ public class KafkaIndexTaskTest
     final KafkaIndexTask task = createTask(
         null,
         new KafkaIOConfig(
+            0,
             baseSequenceName,
             startPartitions,
             endPartitions,
@@ -514,14 +517,16 @@ public class KafkaIndexTaskTest
     Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
 
     Assert.assertEquals(1, checkpointRequestsHash.size());
-    Assert.assertTrue(checkpointRequestsHash.contains(
-        Objects.hash(
-            DATA_SCHEMA.getDataSource(),
-            baseSequenceName,
-            new KafkaDataSourceMetadata(startPartitions),
-            new KafkaDataSourceMetadata(new KafkaPartitions(topic, 
currentOffsets))
+    Assert.assertTrue(
+        checkpointRequestsHash.contains(
+            Objects.hash(
+                DATA_SCHEMA.getDataSource(),
+                0,
+                new KafkaDataSourceMetadata(startPartitions),
+                new KafkaDataSourceMetadata(new KafkaPartitions(topic, 
currentOffsets))
+            )
         )
-    ));
+    );
 
     // Check metrics
     Assert.assertEquals(8, 
task.getRunner().getRowIngestionMeters().getProcessed());
@@ -581,6 +586,7 @@ public class KafkaIndexTaskTest
     final KafkaIndexTask task = createTask(
         null,
         new KafkaIOConfig(
+            0,
             baseSequenceName,
             startPartitions,
             endPartitions,
@@ -603,14 +609,16 @@ public class KafkaIndexTaskTest
     Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
 
     Assert.assertEquals(1, checkpointRequestsHash.size());
-    Assert.assertTrue(checkpointRequestsHash.contains(
-        Objects.hash(
-            DATA_SCHEMA.getDataSource(),
-            baseSequenceName,
-            new KafkaDataSourceMetadata(startPartitions),
-            new KafkaDataSourceMetadata(new KafkaPartitions(topic, 
checkpoint.getPartitionOffsetMap()))
+    Assert.assertTrue(
+        checkpointRequestsHash.contains(
+            Objects.hash(
+                DATA_SCHEMA.getDataSource(),
+                0,
+                new KafkaDataSourceMetadata(startPartitions),
+                new KafkaDataSourceMetadata(new KafkaPartitions(topic, 
checkpoint.getPartitionOffsetMap()))
+            )
         )
-    ));
+    );
 
     // Check metrics
     Assert.assertEquals(2, 
task.getRunner().getRowIngestionMeters().getProcessed());
@@ -637,6 +645,7 @@ public class KafkaIndexTaskTest
     final KafkaIndexTask task = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 0L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -690,6 +699,7 @@ public class KafkaIndexTaskTest
     final KafkaIndexTask task = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 0L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -753,6 +763,7 @@ public class KafkaIndexTaskTest
             )
         ),
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 0L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -812,6 +823,7 @@ public class KafkaIndexTaskTest
     final KafkaIndexTask task = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
@@ -852,6 +864,7 @@ public class KafkaIndexTaskTest
     final KafkaIndexTask task = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -903,6 +916,7 @@ public class KafkaIndexTaskTest
     final KafkaIndexTask task = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -957,6 +971,7 @@ public class KafkaIndexTaskTest
     final KafkaIndexTask task = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 7L)),
@@ -1000,6 +1015,7 @@ public class KafkaIndexTaskTest
     final KafkaIndexTask task = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 13L)),
@@ -1081,6 +1097,7 @@ public class KafkaIndexTaskTest
     final KafkaIndexTask task = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 10L)),
@@ -1140,6 +1157,7 @@ public class KafkaIndexTaskTest
     final KafkaIndexTask task1 = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -1153,6 +1171,7 @@ public class KafkaIndexTaskTest
     final KafkaIndexTask task2 = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -1206,6 +1225,7 @@ public class KafkaIndexTaskTest
     final KafkaIndexTask task1 = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -1219,6 +1239,7 @@ public class KafkaIndexTaskTest
     final KafkaIndexTask task2 = createTask(
         null,
         new KafkaIOConfig(
+            1,
             "sequence1",
             new KafkaPartitions(topic, ImmutableMap.of(0, 3L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 10L)),
@@ -1273,6 +1294,7 @@ public class KafkaIndexTaskTest
     final KafkaIndexTask task1 = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -1286,6 +1308,7 @@ public class KafkaIndexTaskTest
     final KafkaIndexTask task2 = createTask(
         null,
         new KafkaIOConfig(
+            1,
             "sequence1",
             new KafkaPartitions(topic, ImmutableMap.of(0, 3L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 10L)),
@@ -1345,6 +1368,7 @@ public class KafkaIndexTaskTest
     final KafkaIndexTask task = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 2L, 1, 0L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L, 1, 2L)),
@@ -1409,6 +1433,7 @@ public class KafkaIndexTaskTest
     final KafkaIndexTask task1 = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -1422,6 +1447,7 @@ public class KafkaIndexTaskTest
     final KafkaIndexTask task2 = createTask(
         null,
         new KafkaIOConfig(
+            1,
             "sequence1",
             new KafkaPartitions(topic, ImmutableMap.of(1, 0L)),
             new KafkaPartitions(topic, ImmutableMap.of(1, 1L)),
@@ -1477,6 +1503,7 @@ public class KafkaIndexTaskTest
     final KafkaIndexTask task1 = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -1513,6 +1540,7 @@ public class KafkaIndexTaskTest
     final KafkaIndexTask task2 = createTask(
         task1.getId(),
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -1564,6 +1592,7 @@ public class KafkaIndexTaskTest
     final KafkaIndexTask task = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -1647,6 +1676,7 @@ public class KafkaIndexTaskTest
     final KafkaIndexTask task = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -1685,6 +1715,7 @@ public class KafkaIndexTaskTest
     final KafkaIndexTask task = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 200L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 500L)),
@@ -1737,6 +1768,7 @@ public class KafkaIndexTaskTest
     final KafkaIndexTask task = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             // task should ignore these and use sequence info sent in the 
context
             new KafkaPartitions(topic, ImmutableMap.of(0, 0L)),
@@ -2026,18 +2058,20 @@ public class KafkaIndexTaskTest
           @Override
           public boolean checkPointDataSourceMetadata(
               String supervisorId,
-              @Nullable String sequenceName,
+              int taskGroupId,
               @Nullable DataSourceMetadata previousDataSourceMetadata,
               @Nullable DataSourceMetadata currentDataSourceMetadata
           )
           {
             log.info("Adding checkpoint hash to the set");
-            checkpointRequestsHash.add(Objects.hash(
-                supervisorId,
-                sequenceName,
-                previousDataSourceMetadata,
-                currentDataSourceMetadata
-            ));
+            checkpointRequestsHash.add(
+                Objects.hash(
+                    supervisorId,
+                    taskGroupId,
+                    previousDataSourceMetadata,
+                    currentDataSourceMetadata
+                )
+            );
             return true;
           }
         }
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 86648fe..f0f6033 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -19,6 +19,7 @@
 
 package io.druid.indexing.kafka.supervisor;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
@@ -61,6 +62,7 @@ import io.druid.java.util.common.StringUtils;
 import io.druid.java.util.common.granularity.Granularities;
 import io.druid.java.util.common.parsers.JSONPathFieldSpec;
 import io.druid.java.util.common.parsers.JSONPathSpec;
+import io.druid.java.util.emitter.EmittingLogger;
 import io.druid.query.aggregation.AggregatorFactory;
 import io.druid.query.aggregation.CountAggregatorFactory;
 import io.druid.segment.TestHelper;
@@ -70,6 +72,7 @@ import 
io.druid.segment.indexing.granularity.UniformGranularitySpec;
 import io.druid.segment.realtime.FireDepartment;
 import io.druid.server.metrics.DruidMonitorSchedulerConfig;
 import io.druid.server.metrics.NoopServiceEmitter;
+import io.druid.server.metrics.ExceptionCapturingServiceEmitter;
 import org.apache.curator.test.TestingCluster;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -99,7 +102,9 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeoutException;
 
 import static org.easymock.EasyMock.anyBoolean;
 import static org.easymock.EasyMock.anyObject;
@@ -141,6 +146,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   private TaskQueue taskQueue;
   private String topic;
   private RowIngestionMetersFactory rowIngestionMetersFactory;
+  private ExceptionCapturingServiceEmitter serviceEmitter;
 
   private static String getTopic()
   {
@@ -213,6 +219,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
 
     topic = getTopic();
     rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory();
+    serviceEmitter = new ExceptionCapturingServiceEmitter();
+    EmittingLogger.registerEmitter(serviceEmitter);
   }
 
   @After
@@ -553,7 +561,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     Task id1 = createKafkaIndexTask(
         "id1",
         DATASOURCE,
-        "index_kafka_testDS__some_other_sequenceName",
+        1,
         new KafkaPartitions("topic", ImmutableMap.of(0, 0L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, 10L)),
         null,
@@ -564,7 +572,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     Task id2 = createKafkaIndexTask(
         "id2",
         DATASOURCE,
-        "sequenceName-0",
+        0,
         new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, 333L, 1, 333L, 2, 
333L)),
         null,
@@ -575,7 +583,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     Task id3 = createKafkaIndexTask(
         "id3",
         DATASOURCE,
-        "index_kafka_testDS__some_other_sequenceName",
+        1,
         new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 1L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, 333L, 1, 333L, 2, 
330L)),
         null,
@@ -586,7 +594,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     Task id4 = createKafkaIndexTask(
         "id4",
         "other-datasource",
-        "index_kafka_testDS_d927edff33c4b3f",
+        2,
         new KafkaPartitions("topic", ImmutableMap.of(0, 0L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, 10L)),
         null,
@@ -634,7 +642,9 @@ public class KafkaSupervisorTest extends EasyMockSupport
 
     TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
     checkpoints.put(0, ImmutableMap.of(0, 0L, 1, 0L, 2, 0L));
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), 
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(2);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints))
+        .times(2);
 
     replayAll();
 
@@ -652,7 +662,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     Task id1 = createKafkaIndexTask(
         "id1",
         DATASOURCE,
-        "sequenceName-0",
+        0,
         new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 2, 0L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 2, 
Long.MAX_VALUE)),
         null,
@@ -661,7 +671,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     Task id2 = createKafkaIndexTask(
         "id2",
         DATASOURCE,
-        "sequenceName-1",
+        1,
         new KafkaPartitions("topic", ImmutableMap.of(1, 0L)),
         new KafkaPartitions("topic", ImmutableMap.of(1, Long.MAX_VALUE)),
         null,
@@ -670,7 +680,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     Task id3 = createKafkaIndexTask(
         "id3",
         DATASOURCE,
-        "sequenceName-0",
+        0,
         new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, 
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
         null,
@@ -679,7 +689,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     Task id4 = createKafkaIndexTask(
         "id4",
         DATASOURCE,
-        "sequenceName-0",
+        0,
         new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, 
Long.MAX_VALUE)),
         null,
@@ -688,7 +698,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     Task id5 = createKafkaIndexTask(
         "id5",
         DATASOURCE,
-        "sequenceName-0",
+        0,
         new KafkaPartitions("topic", ImmutableMap.of(1, 0L, 2, 0L)),
         new KafkaPartitions("topic", ImmutableMap.of(1, Long.MAX_VALUE, 2, 
Long.MAX_VALUE)),
         null,
@@ -727,8 +737,12 @@ public class KafkaSupervisorTest extends EasyMockSupport
     checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L));
     TreeMap<Integer, Map<Integer, Long>> checkpoints2 = new TreeMap<>();
     checkpoints2.put(0, ImmutableMap.of(1, 0L));
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), 
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1)).times(1);
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), 
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2)).times(1);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints1))
+        .times(1);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints2))
+        .times(1);
 
     taskRunner.registerListener(anyObject(TaskRunnerListener.class), 
anyObject(Executor.class));
     taskQueue.shutdown("id4");
@@ -765,10 +779,12 @@ public class KafkaSupervisorTest extends EasyMockSupport
     checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L));
     TreeMap<Integer, Map<Integer, Long>> checkpoints2 = new TreeMap<>();
     checkpoints2.put(0, ImmutableMap.of(1, 0L));
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), 
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1))
-                                                                               
         .anyTimes();
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), 
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2))
-                                                                               
         .anyTimes();
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints1))
+        .anyTimes();
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints2))
+        .anyTimes();
 
     taskRunner.registerListener(anyObject(TaskRunnerListener.class), 
anyObject(Executor.class));
     replayAll();
@@ -830,7 +846,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     Task id1 = createKafkaIndexTask(
         "id1",
         DATASOURCE,
-        "sequenceName-0",
+        0,
         new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 2, 0L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 2, 
Long.MAX_VALUE)),
         now,
@@ -857,7 +873,9 @@ public class KafkaSupervisorTest extends EasyMockSupport
 
     TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
     checkpoints.put(0, ImmutableMap.of(0, 0L, 2, 0L));
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), 
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(2);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints))
+        .times(2);
 
     taskRunner.registerListener(anyObject(TaskRunnerListener.class), 
anyObject(Executor.class));
     replayAll();
@@ -878,9 +896,12 @@ public class KafkaSupervisorTest extends EasyMockSupport
     reset(taskClient);
 
     // for the newly created replica task
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), 
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints))
-                                                                               
         .times(2);
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), 
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints))
+        .times(2);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints))
+        .times(1);
 
     
expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(captured.getValue())).anyTimes();
     
expect(taskStorage.getStatus(iHaveFailed.getId())).andReturn(Optional.of(TaskStatus.failure(iHaveFailed.getId())));
@@ -953,10 +974,12 @@ public class KafkaSupervisorTest extends EasyMockSupport
     TreeMap<Integer, Map<Integer, Long>> checkpoints2 = new TreeMap<>();
     checkpoints2.put(0, ImmutableMap.of(1, 0L));
     // there would be 4 tasks, 2 for each task group
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), 
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1))
-                                                                               
         .times(2);
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), 
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2))
-                                                                               
         .times(2);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints1))
+        .times(2);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints2))
+        .times(2);
 
     expect(taskStorage.getActiveTasks()).andReturn(tasks).anyTimes();
     for (Task task : tasks) {
@@ -1063,10 +1086,12 @@ public class KafkaSupervisorTest extends EasyMockSupport
     checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L));
     TreeMap<Integer, Map<Integer, Long>> checkpoints2 = new TreeMap<>();
     checkpoints2.put(0, ImmutableMap.of(1, 0L));
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), 
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1))
-                                                                               
         .times(2);
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), 
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2))
-                                                                               
         .times(2);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints1))
+        .times(2);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints2))
+        .times(2);
 
     replay(taskStorage, taskRunner, taskClient, taskQueue);
 
@@ -1100,7 +1125,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     Task task = createKafkaIndexTask(
         "id1",
         DATASOURCE,
-        "sequenceName-0",
+        0,
         new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, 
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
         null,
@@ -1192,7 +1217,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     Task task = createKafkaIndexTask(
         "id1",
         DATASOURCE,
-        "sequenceName-0",
+        0,
         new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 2, 0L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 2, 
Long.MAX_VALUE)),
         null,
@@ -1282,7 +1307,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     Task id1 = createKafkaIndexTask(
         "id1",
         DATASOURCE,
-        "sequenceName-0",
+        0,
         new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, 
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
         null,
@@ -1292,7 +1317,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     Task id2 = createKafkaIndexTask(
         "id2",
         DATASOURCE,
-        "sequenceName-0",
+        0,
         new KafkaPartitions("topic", ImmutableMap.of(0, 1L, 1, 2L, 2, 3L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, 
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
         null,
@@ -1330,7 +1355,9 @@ public class KafkaSupervisorTest extends EasyMockSupport
     // since id1 is publishing, so getCheckpoints wouldn't be called for it
     TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
     checkpoints.put(0, ImmutableMap.of(0, 1L, 1, 2L, 2, 3L));
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), 
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints))
+        .times(1);
 
     replayAll();
 
@@ -1404,10 +1431,12 @@ public class KafkaSupervisorTest extends EasyMockSupport
     checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L));
     TreeMap<Integer, Map<Integer, Long>> checkpoints2 = new TreeMap<>();
     checkpoints2.put(0, ImmutableMap.of(1, 0L));
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), 
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1))
-                                                                               
         .times(2);
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), 
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2))
-                                                                               
         .times(2);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints1))
+        .times(2);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints2))
+        .times(2);
 
     expect(taskStorage.getActiveTasks()).andReturn(tasks).anyTimes();
     for (Task task : tasks) {
@@ -1463,10 +1492,12 @@ public class KafkaSupervisorTest extends EasyMockSupport
     checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L));
     TreeMap<Integer, Map<Integer, Long>> checkpoints2 = new TreeMap<>();
     checkpoints2.put(0, ImmutableMap.of(1, 0L));
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), 
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1))
-                                                                               
         .times(2);
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), 
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2))
-                                                                               
         .times(2);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints1))
+        .times(2);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints2))
+        .times(2);
 
     captured = Capture.newInstance(CaptureType.ALL);
     expect(taskStorage.getActiveTasks()).andReturn(tasks).anyTimes();
@@ -1540,10 +1571,12 @@ public class KafkaSupervisorTest extends EasyMockSupport
     checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L));
     TreeMap<Integer, Map<Integer, Long>> checkpoints2 = new TreeMap<>();
     checkpoints2.put(0, ImmutableMap.of(1, 0L));
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), 
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1))
-                                                                               
         .times(2);
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), 
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2))
-                                                                               
         .times(2);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints1))
+        .times(2);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints2))
+        .times(2);
 
     captured = Capture.newInstance(CaptureType.ALL);
     expect(taskStorage.getActiveTasks()).andReturn(tasks).anyTimes();
@@ -1622,7 +1655,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     Task id1 = createKafkaIndexTask(
         "id1",
         DATASOURCE,
-        "sequenceName-0",
+        0,
         new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, 
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
         null,
@@ -1632,7 +1665,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     Task id2 = createKafkaIndexTask(
         "id2",
         DATASOURCE,
-        "sequenceName-0",
+        0,
         new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, 
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
         null,
@@ -1642,7 +1675,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     Task id3 = createKafkaIndexTask(
         "id3",
         DATASOURCE,
-        "sequenceName-0",
+        0,
         new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, 
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
         null,
@@ -1678,8 +1711,12 @@ public class KafkaSupervisorTest extends EasyMockSupport
     // getCheckpoints will not be called for id1 as it is in publishing state
     TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
     checkpoints.put(0, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L));
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), 
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), 
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints))
+        .times(1);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints))
+        .times(1);
 
     taskRunner.registerListener(anyObject(TaskRunnerListener.class), 
anyObject(Executor.class));
     replayAll();
@@ -1824,7 +1861,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     Task id1 = createKafkaIndexTask(
         "id1",
         DATASOURCE,
-        "sequenceName-0",
+        0,
         new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, 
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
         null,
@@ -1834,7 +1871,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     Task id2 = createKafkaIndexTask(
         "id2",
         DATASOURCE,
-        "sequenceName-0",
+        0,
         new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, 
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
         null,
@@ -1844,7 +1881,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     Task id3 = createKafkaIndexTask(
         "id3",
         DATASOURCE,
-        "sequenceName-0",
+        0,
         new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, 
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
         null,
@@ -1879,8 +1916,12 @@ public class KafkaSupervisorTest extends EasyMockSupport
 
     TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
     checkpoints.put(0, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L));
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), 
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), 
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints))
+        .times(1);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints))
+        .times(1);
 
     taskRunner.registerListener(anyObject(TaskRunnerListener.class), 
anyObject(Executor.class));
     replayAll();
@@ -1908,7 +1949,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     Task id1 = createKafkaIndexTask(
         "id1",
         DATASOURCE,
-        "sequenceName-0",
+        0,
         new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, 
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
         null,
@@ -1918,7 +1959,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     Task id2 = createKafkaIndexTask(
         "id2",
         DATASOURCE,
-        "sequenceName-0",
+        0,
         new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, 
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
         null,
@@ -1928,7 +1969,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     Task id3 = createKafkaIndexTask(
         "id3",
         DATASOURCE,
-        "sequenceName-0",
+        0,
         new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, 
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
         null,
@@ -1958,9 +1999,15 @@ public class KafkaSupervisorTest extends EasyMockSupport
 
     TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
     checkpoints.put(0, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L));
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), 
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), 
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), 
anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints))
+        .times(1);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints))
+        .times(1);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints))
+        .times(1);
 
     taskRunner.registerListener(anyObject(TaskRunnerListener.class), 
anyObject(Executor.class));
     replayAll();
@@ -1980,6 +2027,172 @@ public class KafkaSupervisorTest extends EasyMockSupport
     verifyAll();
   }
 
+  @Test(timeout = 60_000L)
+  public void testCheckpointForInactiveTaskGroup()
+      throws InterruptedException, ExecutionException, TimeoutException, 
JsonProcessingException
+  {
+    supervisor = getSupervisor(2, 1, true, "PT1S", null, null, false);
+    //not adding any events
+    final Task id1 = createKafkaIndexTask(
+        "id1",
+        DATASOURCE,
+        0,
+        new KafkaPartitions(topic, ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
+        new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE, 1, 
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
+        null,
+        null
+    );
+
+    final Task id2 = createKafkaIndexTask(
+        "id2",
+        DATASOURCE,
+        0,
+        new KafkaPartitions(topic, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
+        new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE, 1, 
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
+        null,
+        null
+    );
+
+    final Task id3 = createKafkaIndexTask(
+        "id3",
+        DATASOURCE,
+        0,
+        new KafkaPartitions(topic, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
+        new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE, 1, 
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
+        null,
+        null
+    );
+
+    
expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+    
expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+    expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(id1, id2, 
id3)).anyTimes();
+    
expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
+    
expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
+    
expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes();
+    expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
+    expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
+    expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes();
+    expect(
+        
indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(new
 KafkaDataSourceMetadata(null)
+    ).anyTimes();
+    
expect(taskClient.getStatusAsync("id1")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING));
+    
expect(taskClient.getStatusAsync("id2")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING));
+    
expect(taskClient.getStatusAsync("id3")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING));
+
+    final DateTime startTime = DateTimes.nowUtc();
+    
expect(taskClient.getStartTimeAsync("id1")).andReturn(Futures.immediateFuture(startTime));
+    
expect(taskClient.getStartTimeAsync("id2")).andReturn(Futures.immediateFuture(startTime));
+    
expect(taskClient.getStartTimeAsync("id3")).andReturn(Futures.immediateFuture(startTime));
+
+    final TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
+    checkpoints.put(0, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L));
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints))
+        .times(1);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints))
+        .times(1);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), 
anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints))
+        .times(1);
+
+    taskRunner.registerListener(anyObject(TaskRunnerListener.class), 
anyObject(Executor.class));
+    replayAll();
+
+    supervisor.start();
+    supervisor.runInternal();
+
+    final Map<Integer, Long> fakeCheckpoints = Collections.emptyMap();
+    supervisor.moveTaskGroupToPendingCompletion(0);
+    supervisor.checkpoint(
+        0,
+        new KafkaDataSourceMetadata(new KafkaPartitions(topic, 
checkpoints.get(0))),
+        new KafkaDataSourceMetadata(new KafkaPartitions(topic, 
fakeCheckpoints))
+    );
+
+    while (supervisor.getNoticesQueueSize() > 0) {
+      Thread.sleep(100);
+    }
+
+    verifyAll();
+
+    Assert.assertNull(serviceEmitter.getStackTrace());
+    Assert.assertNull(serviceEmitter.getExceptionMessage());
+    Assert.assertNull(serviceEmitter.getExceptionClass());
+  }
+
+  @Test(timeout = 60_000L)
+  public void testCheckpointForUnknownTaskGroup() throws InterruptedException
+  {
+    supervisor = getSupervisor(2, 1, true, "PT1S", null, null, false);
+    //not adding any events
+    final Task id1 = createKafkaIndexTask(
+        "id1",
+        DATASOURCE,
+        0,
+        new KafkaPartitions(topic, ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
+        new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE, 1, 
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
+        null,
+        null
+    );
+
+    final Task id2 = createKafkaIndexTask(
+        "id2",
+        DATASOURCE,
+        0,
+        new KafkaPartitions(topic, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
+        new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE, 1, 
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
+        null,
+        null
+    );
+
+    final Task id3 = createKafkaIndexTask(
+        "id3",
+        DATASOURCE,
+        0,
+        new KafkaPartitions(topic, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
+        new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE, 1, 
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
+        null,
+        null
+    );
+
+    
expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+    
expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+    expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(id1, id2, 
id3)).anyTimes();
+    
expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
+    
expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
+    
expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes();
+    expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
+    expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
+    expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes();
+    expect(
+        
indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(new
 KafkaDataSourceMetadata(null)
+    ).anyTimes();
+
+    replayAll();
+
+    supervisor.start();
+
+    supervisor.checkpoint(
+        0,
+        new KafkaDataSourceMetadata(new KafkaPartitions(topic, 
Collections.emptyMap())),
+        new KafkaDataSourceMetadata(new KafkaPartitions(topic, 
Collections.emptyMap()))
+    );
+
+    while (supervisor.getNoticesQueueSize() > 0) {
+      Thread.sleep(100);
+    }
+
+    verifyAll();
+
+    Assert.assertNotNull(serviceEmitter.getStackTrace());
+    Assert.assertEquals(
+        "WTH?! cannot find taskGroup [0] among all taskGroups [{}]",
+        serviceEmitter.getExceptionMessage()
+    );
+    Assert.assertEquals(ISE.class, serviceEmitter.getExceptionClass());
+  }
+
   private void addSomeEvents(int numEventsPerPartition) throws Exception
   {
     try (final KafkaProducer<byte[], byte[]> kafkaProducer = 
kafkaServer.newProducer()) {
@@ -2106,7 +2319,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   private KafkaIndexTask createKafkaIndexTask(
       String id,
       String dataSource,
-      String sequenceName,
+      int taskGroupId,
       KafkaPartitions startPartitions,
       KafkaPartitions endPartitions,
       DateTime minimumMessageTime,
@@ -2119,7 +2332,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
         getDataSchema(dataSource),
         tuningConfig,
         new KafkaIOConfig(
-            sequenceName,
+            taskGroupId,
+            "sequenceName-" + taskGroupId,
             startPartitions,
             endPartitions,
             ImmutableMap.<String, String>of(),
@@ -2128,7 +2342,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
             maximumMessageTime,
             false
         ),
-        ImmutableMap.<String, Object>of(),
+        Collections.emptyMap(),
         null,
         null,
         rowIngestionMetersFactory
diff --git 
a/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java
 
b/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java
index 8265f87..f1d11de 100644
--- 
a/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java
+++ 
b/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java
@@ -21,27 +21,28 @@ package io.druid.indexing.common.actions;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.base.Preconditions;
 import io.druid.indexing.common.task.Task;
 import io.druid.indexing.overlord.DataSourceMetadata;
 
 public class CheckPointDataSourceMetadataAction implements TaskAction<Boolean>
 {
   private final String supervisorId;
-  private final String sequenceName;
+  private final int taskGroupId;
   private final DataSourceMetadata previousCheckPoint;
   private final DataSourceMetadata currentCheckPoint;
 
   public CheckPointDataSourceMetadataAction(
       @JsonProperty("supervisorId") String supervisorId,
-      @JsonProperty("sequenceName") String sequenceName,
+      @JsonProperty("taskGroupId") Integer taskGroupId,
       @JsonProperty("previousCheckPoint") DataSourceMetadata 
previousCheckPoint,
       @JsonProperty("currentCheckPoint") DataSourceMetadata currentCheckPoint
   )
   {
-    this.supervisorId = supervisorId;
-    this.sequenceName = sequenceName;
-    this.previousCheckPoint = previousCheckPoint;
-    this.currentCheckPoint = currentCheckPoint;
+    this.supervisorId = Preconditions.checkNotNull(supervisorId, 
"supervisorId");
+    this.taskGroupId = Preconditions.checkNotNull(taskGroupId, "taskGroupId");
+    this.previousCheckPoint = Preconditions.checkNotNull(previousCheckPoint, 
"previousCheckPoint");
+    this.currentCheckPoint = Preconditions.checkNotNull(currentCheckPoint, 
"currentCheckPoint");
   }
 
   @JsonProperty
@@ -51,9 +52,9 @@ public class CheckPointDataSourceMetadataAction implements 
TaskAction<Boolean>
   }
 
   @JsonProperty
-  public String getSequenceName()
+  public int getTaskGroupId()
   {
-    return sequenceName;
+    return taskGroupId;
   }
 
   @JsonProperty
@@ -81,8 +82,12 @@ public class CheckPointDataSourceMetadataAction implements 
TaskAction<Boolean>
       Task task, TaskActionToolbox toolbox
   )
   {
-    return toolbox.getSupervisorManager()
-                  .checkPointDataSourceMetadata(supervisorId, sequenceName, 
previousCheckPoint, currentCheckPoint);
+    return toolbox.getSupervisorManager().checkPointDataSourceMetadata(
+        supervisorId,
+        taskGroupId,
+        previousCheckPoint,
+        currentCheckPoint
+    );
   }
 
   @Override
@@ -96,7 +101,7 @@ public class CheckPointDataSourceMetadataAction implements 
TaskAction<Boolean>
   {
     return "CheckPointDataSourceMetadataAction{" +
            "supervisorId='" + supervisorId + '\'' +
-           ", sequenceName='" + sequenceName + '\'' +
+           ", taskGroupId='" + taskGroupId + '\'' +
            ", previousCheckPoint=" + previousCheckPoint +
            ", currentCheckPoint=" + currentCheckPoint +
            '}';
diff --git 
a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java
 
b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java
index dcdd014..f9a5564 100644
--- 
a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java
+++ 
b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java
@@ -165,9 +165,9 @@ public class SupervisorManager
 
   public boolean checkPointDataSourceMetadata(
       String supervisorId,
-      @Nullable String sequenceName,
-      @Nullable DataSourceMetadata previousDataSourceMetadata,
-      @Nullable DataSourceMetadata currentDataSourceMetadata
+      int taskGroupId,
+      DataSourceMetadata previousDataSourceMetadata,
+      DataSourceMetadata currentDataSourceMetadata
   )
   {
     try {
@@ -178,7 +178,7 @@ public class SupervisorManager
 
       Preconditions.checkNotNull(supervisor, "supervisor could not be found");
 
-      supervisor.lhs.checkpoint(sequenceName, previousDataSourceMetadata, 
currentDataSourceMetadata);
+      supervisor.lhs.checkpoint(taskGroupId, previousDataSourceMetadata, 
currentDataSourceMetadata);
       return true;
     }
     catch (Exception e) {
diff --git 
a/java-util/src/main/java/io/druid/java/util/emitter/EmittingLogger.java 
b/java-util/src/main/java/io/druid/java/util/emitter/EmittingLogger.java
index 0020cf1..661ed17 100644
--- a/java-util/src/main/java/io/druid/java/util/emitter/EmittingLogger.java
+++ b/java-util/src/main/java/io/druid/java/util/emitter/EmittingLogger.java
@@ -35,6 +35,10 @@ import java.io.StringWriter;
  */
 public class EmittingLogger extends Logger
 {
+  public static final String EXCEPTION_TYPE_KEY = "exceptionType";
+  public static final String EXCEPTION_MESSAGE_KEY = "exceptionMessage";
+  public static final String EXCEPTION_STACK_TRACE_KEY = "exceptionStackTrace";
+
   private static volatile ServiceEmitter emitter = null;
 
   private final String className;
diff --git 
a/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
 
b/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
index 26ed99c..0408104 100644
--- 
a/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
+++ 
b/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
@@ -83,9 +83,9 @@ public class NoopSupervisorSpec implements SupervisorSpec
 
       @Override
       public void checkpoint(
-          @Nullable String sequenceName,
-          @Nullable DataSourceMetadata previousCheckPoint,
-          @Nullable DataSourceMetadata currentCheckPoint
+          int taskGroupId,
+          DataSourceMetadata previousCheckPoint,
+          DataSourceMetadata currentCheckPoint
       )
       {
 
diff --git 
a/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java 
b/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java
index 5afe912..04afac7 100644
--- a/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java
+++ b/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java
@@ -22,7 +22,6 @@ package io.druid.indexing.overlord.supervisor;
 import com.google.common.collect.ImmutableMap;
 import io.druid.indexing.overlord.DataSourceMetadata;
 
-import javax.annotation.Nullable;
 import java.util.Map;
 
 public interface Supervisor
@@ -52,13 +51,9 @@ public interface Supervisor
    * for example - Kafka Supervisor uses this to merge and handoff segments 
containing at least the data
    * represented by {@param currentCheckpoint} DataSourceMetadata
    *
-   * @param sequenceName       unique Identifier to figure out for which 
sequence to do checkpointing
+   * @param taskGroupId        unique Identifier to figure out for which 
sequence to do checkpointing
    * @param previousCheckPoint DataSourceMetadata checkpointed in previous call
    * @param currentCheckPoint  current DataSourceMetadata to be checkpointed
    */
-  void checkpoint(
-      @Nullable String sequenceName,
-      @Nullable DataSourceMetadata previousCheckPoint,
-      @Nullable DataSourceMetadata currentCheckPoint
-  );
+  void checkpoint(int taskGroupId, DataSourceMetadata previousCheckPoint, 
DataSourceMetadata currentCheckPoint);
 }
diff --git 
a/server/src/test/java/io/druid/server/metrics/ExceptionCapturingServiceEmitter.java
 
b/server/src/test/java/io/druid/server/metrics/ExceptionCapturingServiceEmitter.java
new file mode 100644
index 0000000..cc217c6
--- /dev/null
+++ 
b/server/src/test/java/io/druid/server/metrics/ExceptionCapturingServiceEmitter.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package io.druid.server.metrics;
+
+import io.druid.java.util.emitter.EmittingLogger;
+import io.druid.java.util.emitter.core.Event;
+import io.druid.java.util.emitter.service.ServiceEmitter;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+
+public class ExceptionCapturingServiceEmitter extends ServiceEmitter
+{
+  private volatile Class exceptionClass;
+  private volatile String exceptionMessage;
+  private volatile String stackTrace;
+
+  public ExceptionCapturingServiceEmitter()
+  {
+    super("", "", null);
+  }
+
+  @Override
+  public void emit(Event event)
+  {
+    //noinspection unchecked
+    final Map<String, Object> dataMap = (Map<String, Object>) 
event.toMap().get("data");
+    final Class exceptionClass = (Class) 
dataMap.get(EmittingLogger.EXCEPTION_TYPE_KEY);
+    if (exceptionClass != null) {
+      final String exceptionMessage = (String) 
dataMap.get(EmittingLogger.EXCEPTION_MESSAGE_KEY);
+      final String stackTrace = (String) 
dataMap.get(EmittingLogger.EXCEPTION_STACK_TRACE_KEY);
+      this.exceptionClass = exceptionClass;
+      this.exceptionMessage = exceptionMessage;
+      this.stackTrace = stackTrace;
+    }
+  }
+
+  @Nullable
+  public Class getExceptionClass()
+  {
+    return exceptionClass;
+  }
+
+  @Nullable
+  public String getExceptionMessage()
+  {
+    return exceptionMessage;
+  }
+
+  @Nullable
+  public String getStackTrace()
+  {
+    return stackTrace;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to