jon-wei closed pull request #6188: [Backport] Fix NPE for taskGroupId when 
rolling update (#6168)
URL: https://github.com/apache/incubator-druid/pull/6188
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
 
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
index 5f039c0336b..c14354c0d21 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
@@ -705,6 +705,7 @@ public void onFailure(Throwable t)
             final CheckPointDataSourceMetadataAction checkpointAction = new 
CheckPointDataSourceMetadataAction(
                 getDataSource(),
                 ioConfig.getTaskGroupId(),
+                getIOConfig().getBaseSequenceName(),
                 new KafkaDataSourceMetadata(new KafkaPartitions(topic, 
sequenceToCheckpoint.getStartOffsets())),
                 new KafkaDataSourceMetadata(new KafkaPartitions(topic, 
nextOffsets))
             );
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
 
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index cb3352d9402..63868230c3d 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -92,6 +92,7 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
@@ -143,7 +144,7 @@
    * time, there should only be up to a maximum of [taskCount] 
actively-reading task groups (tracked in the [taskGroups]
    * map) + zero or more pending-completion task groups (tracked in 
[pendingCompletionTaskGroups]).
    */
-  private static class TaskGroup
+  private class TaskGroup
   {
     // This specifies the partitions and starting offsets for this task group. 
It is set on group creation from the data
     // in [partitionGroups] and never changes during the lifetime of this task 
group, which will live until a task in
@@ -157,6 +158,7 @@
     final Optional<DateTime> maximumMessageTime;
     DateTime completionTimeout; // is set after signalTasksToFinish(); if not 
done by timeout, take corrective action
     final TreeMap<Integer, Map<Integer, Long>> sequenceOffsets = new 
TreeMap<>();
+    final String baseSequenceName;
 
     TaskGroup(
         ImmutableMap<Integer, Long> partitionOffsets,
@@ -168,6 +170,7 @@
       this.minimumMessageTime = minimumMessageTime;
       this.maximumMessageTime = maximumMessageTime;
       this.sequenceOffsets.put(0, partitionOffsets);
+      this.baseSequenceName = generateSequenceName(partitionOffsets, 
minimumMessageTime, maximumMessageTime);
     }
 
     int addNewCheckpoint(Map<Integer, Long> checkpoint)
@@ -490,25 +493,29 @@ public void reset(DataSourceMetadata dataSourceMetadata)
   }
 
   @Override
-  public void checkpoint(int taskGroupId, DataSourceMetadata 
previousCheckpoint, DataSourceMetadata currentCheckpoint)
+  public void checkpoint(
+      @Nullable Integer taskGroupId,
+      @Deprecated String baseSequenceName,
+      DataSourceMetadata previousCheckPoint,
+      DataSourceMetadata currentCheckPoint
+  )
   {
-    Preconditions.checkNotNull(previousCheckpoint, "previousCheckpoint");
-    Preconditions.checkNotNull(currentCheckpoint, "current checkpoint cannot 
be null");
+    Preconditions.checkNotNull(previousCheckPoint, "previousCheckpoint");
+    Preconditions.checkNotNull(currentCheckPoint, "current checkpoint cannot 
be null");
     Preconditions.checkArgument(
-        ioConfig.getTopic()
-                .equals(((KafkaDataSourceMetadata) 
currentCheckpoint).getKafkaPartitions()
-                                                                     
.getTopic()),
+        ioConfig.getTopic().equals(((KafkaDataSourceMetadata) 
currentCheckPoint).getKafkaPartitions().getTopic()),
         "Supervisor topic [%s] and topic in checkpoint [%s] does not match",
         ioConfig.getTopic(),
-        ((KafkaDataSourceMetadata) 
currentCheckpoint).getKafkaPartitions().getTopic()
+        ((KafkaDataSourceMetadata) 
currentCheckPoint).getKafkaPartitions().getTopic()
     );
 
-    log.info("Checkpointing [%s] for taskGroup [%s]", currentCheckpoint, 
taskGroupId);
+    log.info("Checkpointing [%s] for taskGroup [%s]", currentCheckPoint, 
taskGroupId);
     notices.add(
         new CheckpointNotice(
             taskGroupId,
-            (KafkaDataSourceMetadata) previousCheckpoint,
-            (KafkaDataSourceMetadata) currentCheckpoint
+            baseSequenceName,
+            (KafkaDataSourceMetadata) previousCheckPoint,
+            (KafkaDataSourceMetadata) currentCheckPoint
         )
     );
   }
@@ -612,17 +619,20 @@ public void handle()
 
   private class CheckpointNotice implements Notice
   {
-    final int taskGroupId;
-    final KafkaDataSourceMetadata previousCheckpoint;
-    final KafkaDataSourceMetadata currentCheckpoint;
+    @Nullable private final Integer nullableTaskGroupId;
+    @Deprecated private final String baseSequenceName;
+    private final KafkaDataSourceMetadata previousCheckpoint;
+    private final KafkaDataSourceMetadata currentCheckpoint;
 
     CheckpointNotice(
-        int taskGroupId,
+        @Nullable Integer nullableTaskGroupId,
+        @Deprecated String baseSequenceName,
         KafkaDataSourceMetadata previousCheckpoint,
         KafkaDataSourceMetadata currentCheckpoint
     )
     {
-      this.taskGroupId = taskGroupId;
+      this.baseSequenceName = baseSequenceName;
+      this.nullableTaskGroupId = nullableTaskGroupId;
       this.previousCheckpoint = previousCheckpoint;
       this.currentCheckpoint = currentCheckpoint;
     }
@@ -630,12 +640,44 @@ public void handle()
     @Override
     public void handle() throws ExecutionException, InterruptedException, 
TimeoutException
     {
+      // Find taskGroupId using taskId if it's null. It can be null while 
rolling update.
+      final int taskGroupId;
+      if (nullableTaskGroupId == null) {
+        // We search taskId in taskGroups and pendingCompletionTaskGroups 
sequentially. This should be fine because
+        // 1) a taskGroup can be moved from taskGroups to 
pendingCompletionTaskGroups in RunNotice
+        //    (see checkTaskDuration()).
+        // 2) Notices are proceesed by a single thread. So, CheckpointNotice 
and RunNotice cannot be processed at the
+        //    same time.
+        final java.util.Optional<Integer> maybeGroupId = taskGroups
+            .entrySet()
+            .stream()
+            .filter(entry -> {
+              final TaskGroup taskGroup = entry.getValue();
+              return taskGroup.baseSequenceName.equals(baseSequenceName);
+            })
+            .findAny()
+            .map(Entry::getKey);
+        taskGroupId = maybeGroupId.orElse(
+            pendingCompletionTaskGroups
+                .entrySet()
+                .stream()
+                .filter(entry -> {
+                  final List<TaskGroup> taskGroups = entry.getValue();
+                  return taskGroups.stream().anyMatch(group -> 
group.baseSequenceName.equals(baseSequenceName));
+                })
+                .findAny()
+                .orElseThrow(() -> new ISE("Cannot find taskGroup for 
baseSequenceName[%s]", baseSequenceName))
+                .getKey()
+        );
+      } else {
+        taskGroupId = nullableTaskGroupId;
+      }
+
       // check for consistency
       // if already received request for this sequenceName and 
dataSourceMetadata combination then return
-
       final TaskGroup taskGroup = taskGroups.get(taskGroupId);
 
-      if (isValidTaskGroup(taskGroup)) {
+      if (isValidTaskGroup(taskGroupId, taskGroup)) {
         final TreeMap<Integer, Map<Integer, Long>> checkpoints = 
taskGroup.sequenceOffsets;
 
         // check validity of previousCheckpoint
@@ -655,20 +697,13 @@ public void handle() throws ExecutionException, 
InterruptedException, TimeoutExc
           log.info("Already checkpointed with offsets [%s]", 
checkpoints.lastEntry().getValue());
           return;
         }
-        final int taskGroupId = getTaskGroupIdForPartition(
-            currentCheckpoint.getKafkaPartitions()
-                             .getPartitionOffsetMap()
-                             .keySet()
-                             .iterator()
-                             .next()
-        );
         final Map<Integer, Long> newCheckpoint = 
checkpointTaskGroup(taskGroupId, false).get();
         taskGroups.get(taskGroupId).addNewCheckpoint(newCheckpoint);
         log.info("Handled checkpoint notice, new checkpoint is [%s] for 
taskGroup [%s]", newCheckpoint, taskGroupId);
       }
     }
 
-    private boolean isValidTaskGroup(@Nullable TaskGroup taskGroup)
+    private boolean isValidTaskGroup(int taskGroupId, @Nullable TaskGroup 
taskGroup)
     {
       if (taskGroup == null) {
         // taskGroup might be in pendingCompletionTaskGroups or partitionGroups
@@ -867,17 +902,6 @@ String generateSequenceName(
     return Joiner.on("_").join("index_kafka", dataSource, hashCode);
   }
 
-  @VisibleForTesting
-  String generateSequenceName(TaskGroup taskGroup)
-  {
-    Preconditions.checkNotNull(taskGroup, "taskGroup cannot be null");
-    return generateSequenceName(
-        taskGroup.partitionOffsets,
-        taskGroup.minimumMessageTime,
-        taskGroup.maximumMessageTime
-    );
-  }
-
   private static String getRandomId()
   {
     final StringBuilder suffix = new StringBuilder(8);
@@ -1748,7 +1772,6 @@ private void createKafkaTasksForGroup(int groupId, int 
replicas) throws JsonProc
       endPartitions.put(partition, Long.MAX_VALUE);
     }
     TaskGroup group = taskGroups.get(groupId);
-    String sequenceName = generateSequenceName(group);
 
     Map<String, String> consumerProperties = 
Maps.newHashMap(ioConfig.getConsumerProperties());
     DateTime minimumMessageTime = 
taskGroups.get(groupId).minimumMessageTime.orNull();
@@ -1756,7 +1779,7 @@ private void createKafkaTasksForGroup(int groupId, int 
replicas) throws JsonProc
 
     KafkaIOConfig kafkaIOConfig = new KafkaIOConfig(
         groupId,
-        sequenceName,
+        group.baseSequenceName,
         new KafkaPartitions(ioConfig.getTopic(), startPartitions),
         new KafkaPartitions(ioConfig.getTopic(), endPartitions),
         consumerProperties,
@@ -1777,10 +1800,10 @@ private void createKafkaTasksForGroup(int groupId, int 
replicas) throws JsonProc
                                             .putAll(spec.getContext())
                                             .build();
     for (int i = 0; i < replicas; i++) {
-      String taskId = Joiner.on("_").join(sequenceName, getRandomId());
+      String taskId = Joiner.on("_").join(group.baseSequenceName, 
getRandomId());
       KafkaIndexTask indexTask = new KafkaIndexTask(
           taskId,
-          new TaskResource(sequenceName, 1),
+          new TaskResource(group.baseSequenceName, 1),
           spec.getDataSchema(),
           taskTuningConfig,
           kafkaIOConfig,
@@ -1909,7 +1932,10 @@ private boolean isTaskCurrent(int taskGroupId, String 
taskId)
 
     String taskSequenceName = ((KafkaIndexTask) 
taskOptional.get()).getIOConfig().getBaseSequenceName();
     if (taskGroups.get(taskGroupId) != null) {
-      return 
generateSequenceName(taskGroups.get(taskGroupId)).equals(taskSequenceName);
+      return Preconditions
+          .checkNotNull(taskGroups.get(taskGroupId), "null taskGroup for 
taskId[%s]", taskGroupId)
+          .baseSequenceName
+          .equals(taskSequenceName);
     } else {
       return generateSequenceName(
           ((KafkaIndexTask) taskOptional.get()).getIOConfig()
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
index 2a852d4a8ad..d89f5e2f026 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -1776,7 +1776,8 @@ private void makeToolboxFactory() throws IOException
           @Override
           public boolean checkPointDataSourceMetadata(
               String supervisorId,
-              int taskGroupId,
+              @Nullable Integer taskGroupId,
+              String baseSequenceName,
               @Nullable DataSourceMetadata previousDataSourceMetadata,
               @Nullable DataSourceMetadata currentDataSourceMetadata
           )
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 2add456df8e..5193b5bbab8 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -2103,6 +2103,7 @@ public void testCheckpointForInactiveTaskGroup()
     supervisor.moveTaskGroupToPendingCompletion(0);
     supervisor.checkpoint(
         0,
+        ((KafkaIndexTask) id1).getIOConfig().getBaseSequenceName(),
         new KafkaDataSourceMetadata(new KafkaPartitions(topic, 
checkpoints.get(0))),
         new KafkaDataSourceMetadata(new KafkaPartitions(topic, 
fakeCheckpoints))
     );
@@ -2172,6 +2173,7 @@ public void testCheckpointForUnknownTaskGroup() throws 
InterruptedException
 
     supervisor.checkpoint(
         0,
+        ((KafkaIndexTask) id1).getIOConfig().getBaseSequenceName(),
         new KafkaDataSourceMetadata(new KafkaPartitions(topic, 
Collections.emptyMap())),
         new KafkaDataSourceMetadata(new KafkaPartitions(topic, 
Collections.emptyMap()))
     );
@@ -2190,13 +2192,100 @@ public void testCheckpointForUnknownTaskGroup() throws 
InterruptedException
     Assert.assertEquals(ISE.class, serviceEmitter.getExceptionClass());
   }
 
+  @Test(timeout = 60_000L)
+  public void testCheckpointWithNullTaskGroupId()
+      throws InterruptedException, ExecutionException, TimeoutException, 
JsonProcessingException
+  {
+    supervisor = getSupervisor(1, 3, true, "PT1S", null, null, false);
+    //not adding any events
+    final Task id1 = createKafkaIndexTask(
+        "id1",
+        DATASOURCE,
+        0,
+        new KafkaPartitions(topic, ImmutableMap.of(0, 0L)),
+        new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE)),
+        null,
+        null
+    );
+
+    final Task id2 = createKafkaIndexTask(
+        "id2",
+        DATASOURCE,
+        0,
+        new KafkaPartitions(topic, ImmutableMap.of(0, 0L)),
+        new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE)),
+        null,
+        null
+    );
+
+    final Task id3 = createKafkaIndexTask(
+        "id3",
+        DATASOURCE,
+        0,
+        new KafkaPartitions(topic, ImmutableMap.of(0, 0L)),
+        new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE)),
+        null,
+        null
+    );
+
+    
expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+    
expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+    expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(id1, id2, 
id3)).anyTimes();
+    
expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
+    
expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
+    
expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes();
+    expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
+    expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
+    expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes();
+    expect(
+        
indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(new
 KafkaDataSourceMetadata(null)
+    ).anyTimes();
+    taskRunner.registerListener(anyObject(TaskRunnerListener.class), 
anyObject(Executor.class));
+    expect(taskClient.getStatusAsync(anyString()))
+        .andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING))
+        .anyTimes();
+    final TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
+    checkpoints.put(0, ImmutableMap.of(0, 0L));
+    expect(taskClient.getCheckpointsAsync(anyString(), anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints))
+        .times(3);
+    
expect(taskClient.getStartTimeAsync(anyString())).andReturn(Futures.immediateFuture(DateTimes.nowUtc())).anyTimes();
+    expect(taskClient.pauseAsync(anyString()))
+        .andReturn(Futures.immediateFuture(ImmutableMap.of(0, 10L)))
+        .anyTimes();
+    expect(taskClient.setEndOffsetsAsync(anyString(), 
EasyMock.eq(ImmutableMap.of(0, 10L)), anyBoolean()))
+        .andReturn(Futures.immediateFuture(true))
+        .anyTimes();
+
+    replayAll();
+
+    supervisor.start();
+
+    supervisor.runInternal();
+
+    final TreeMap<Integer, Map<Integer, Long>> newCheckpoints = new 
TreeMap<>();
+    newCheckpoints.put(0, ImmutableMap.of(0, 10L));
+    supervisor.checkpoint(
+        null,
+        ((KafkaIndexTask) id1).getIOConfig().getBaseSequenceName(),
+        new KafkaDataSourceMetadata(new KafkaPartitions(topic, 
checkpoints.get(0))),
+        new KafkaDataSourceMetadata(new KafkaPartitions(topic, 
newCheckpoints.get(0)))
+    );
+
+    while (supervisor.getNoticesQueueSize() > 0) {
+      Thread.sleep(100);
+    }
+
+    verifyAll();
+  }
+
   private void addSomeEvents(int numEventsPerPartition) throws Exception
   {
     try (final KafkaProducer<byte[], byte[]> kafkaProducer = 
kafkaServer.newProducer()) {
       for (int i = 0; i < NUM_PARTITIONS; i++) {
         for (int j = 0; j < numEventsPerPartition; j++) {
           kafkaProducer.send(
-              new ProducerRecord<byte[], byte[]>(
+              new ProducerRecord<>(
                   topic,
                   i,
                   null,
diff --git 
a/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java
 
b/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java
index 083ef9e5355..bc1b896d429 100644
--- 
a/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java
+++ 
b/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java
@@ -26,23 +26,29 @@
 import io.druid.indexing.overlord.DataSourceMetadata;
 
 import java.io.IOException;
+import javax.annotation.Nullable;
 
 public class CheckPointDataSourceMetadataAction implements TaskAction<Boolean>
 {
   private final String supervisorId;
-  private final int taskGroupId;
+  @Nullable
+  private final Integer taskGroupId;
+  @Deprecated
+  private final String baseSequenceName;
   private final DataSourceMetadata previousCheckPoint;
   private final DataSourceMetadata currentCheckPoint;
 
   public CheckPointDataSourceMetadataAction(
       @JsonProperty("supervisorId") String supervisorId,
-      @JsonProperty("taskGroupId") Integer taskGroupId,
+      @JsonProperty("taskGroupId") @Nullable Integer taskGroupId, // nullable 
for backward compatibility,
+      @JsonProperty("sequenceName") @Deprecated String baseSequenceName, // 
old version would use this
       @JsonProperty("previousCheckPoint") DataSourceMetadata 
previousCheckPoint,
       @JsonProperty("currentCheckPoint") DataSourceMetadata currentCheckPoint
   )
   {
     this.supervisorId = Preconditions.checkNotNull(supervisorId, 
"supervisorId");
-    this.taskGroupId = Preconditions.checkNotNull(taskGroupId, "taskGroupId");
+    this.taskGroupId = taskGroupId;
+    this.baseSequenceName = Preconditions.checkNotNull(baseSequenceName, 
"sequenceName");
     this.previousCheckPoint = Preconditions.checkNotNull(previousCheckPoint, 
"previousCheckPoint");
     this.currentCheckPoint = Preconditions.checkNotNull(currentCheckPoint, 
"currentCheckPoint");
   }
@@ -53,8 +59,16 @@ public String getSupervisorId()
     return supervisorId;
   }
 
+  @Deprecated
+  @JsonProperty("sequenceName")
+  public String getBaseSequenceName()
+  {
+    return baseSequenceName;
+  }
+
+  @Nullable
   @JsonProperty
-  public int getTaskGroupId()
+  public Integer getTaskGroupId()
   {
     return taskGroupId;
   }
@@ -87,6 +101,7 @@ public Boolean perform(
     return toolbox.getSupervisorManager().checkPointDataSourceMetadata(
         supervisorId,
         taskGroupId,
+        baseSequenceName,
         previousCheckPoint,
         currentCheckPoint
     );
@@ -103,6 +118,7 @@ public String toString()
   {
     return "CheckPointDataSourceMetadataAction{" +
            "supervisorId='" + supervisorId + '\'' +
+           ", baseSequenceName='" + baseSequenceName + '\'' +
            ", taskGroupId='" + taskGroupId + '\'' +
            ", previousCheckPoint=" + previousCheckPoint +
            ", currentCheckPoint=" + currentCheckPoint +
diff --git 
a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java
 
b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java
index 31e54cfd521..cf3fe27e9c0 100644
--- 
a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java
+++ 
b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java
@@ -159,7 +159,8 @@ public boolean resetSupervisor(String id, @Nullable 
DataSourceMetadata dataSourc
 
   public boolean checkPointDataSourceMetadata(
       String supervisorId,
-      int taskGroupId,
+      @Nullable Integer taskGroupId,
+      String baseSequenceName,
       DataSourceMetadata previousDataSourceMetadata,
       DataSourceMetadata currentDataSourceMetadata
   )
@@ -172,7 +173,7 @@ public boolean checkPointDataSourceMetadata(
 
       Preconditions.checkNotNull(supervisor, "supervisor could not be found");
 
-      supervisor.lhs.checkpoint(taskGroupId, previousDataSourceMetadata, 
currentDataSourceMetadata);
+      supervisor.lhs.checkpoint(taskGroupId, baseSequenceName, 
previousDataSourceMetadata, currentDataSourceMetadata);
       return true;
     }
     catch (Exception e) {
diff --git 
a/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
 
b/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
index 0ba0701e82d..8f8105ea8ab 100644
--- 
a/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
+++ 
b/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
@@ -83,7 +83,8 @@ public void reset(DataSourceMetadata dataSourceMetadata) {}
 
       @Override
       public void checkpoint(
-          int taskGroupId,
+          @Nullable Integer taskGroupId,
+          String baseSequenceName,
           DataSourceMetadata previousCheckPoint,
           DataSourceMetadata currentCheckPoint
       )
diff --git 
a/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java 
b/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java
index 58e73ff3df7..3f90766e3cd 100644
--- a/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java
+++ b/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java
@@ -21,6 +21,8 @@
 
 import io.druid.indexing.overlord.DataSourceMetadata;
 
+import javax.annotation.Nullable;
+
 public interface Supervisor
 {
   void start();
@@ -44,8 +46,14 @@
    * represented by {@param currentCheckpoint} DataSourceMetadata
    *
    * @param taskGroupId        unique Identifier to figure out for which 
sequence to do checkpointing
+   * @param baseSequenceName   baseSequenceName
    * @param previousCheckPoint DataSourceMetadata checkpointed in previous call
    * @param currentCheckPoint  current DataSourceMetadata to be checkpointed
    */
-  void checkpoint(int taskGroupId, DataSourceMetadata previousCheckPoint, 
DataSourceMetadata currentCheckPoint);
+  void checkpoint(
+      @Nullable Integer taskGroupId,
+      @Deprecated String baseSequenceName,
+      DataSourceMetadata previousCheckPoint,
+      DataSourceMetadata currentCheckPoint
+  );
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to