jsun98 commented on a change in pull request #6431: Add Kinesis Indexing 
Service to core Druid
URL: https://github.com/apache/incubator-druid/pull/6431#discussion_r243110291
 
 

 ##########
 File path: 
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
 ##########
 @@ -119,2258 +76,243 @@
  * tasks to satisfy the desired number of replicas. As tasks complete, new 
tasks are queued to process the next range of
  * Kafka offsets.
  */
-public class KafkaSupervisor implements Supervisor
+public class KafkaSupervisor extends SeekableStreamSupervisor<Integer, Long>
 {
   private static final EmittingLogger log = new 
EmittingLogger(KafkaSupervisor.class);
-  private static final long MAX_RUN_FREQUENCY_MILLIS = 1000; // prevent us 
from running too often in response to events
-  private static final long NOT_SET = -1;
-  private static final long MINIMUM_FUTURE_TIMEOUT_IN_SECONDS = 120;
   private static final long MINIMUM_GET_OFFSET_PERIOD_MILLIS = 5000;
   private static final long INITIAL_GET_OFFSET_DELAY_MILLIS = 15000;
   private static final long INITIAL_EMIT_LAG_METRIC_DELAY_MILLIS = 25000;
-  private static final int MAX_INITIALIZATION_RETRIES = 20;
+  private static final Long NOT_SET = -1L;
+  private static final Long END_OF_PARTITION = Long.MAX_VALUE;
 
-  public static final String IS_INCREMENTAL_HANDOFF_SUPPORTED = 
"IS_INCREMENTAL_HANDOFF_SUPPORTED";
-
-  // Internal data structures
-  // --------------------------------------------------------
-
-  /**
-   * A TaskGroup is the main data structure used by KafkaSupervisor to 
organize and monitor Kafka partitions and
-   * indexing tasks. All the tasks in a TaskGroup should always be doing the 
same thing (reading the same partitions and
-   * starting from the same offset) and if [replicas] is configured to be 1, a 
TaskGroup will contain a single task (the
-   * exception being if the supervisor started up and discovered and adopted 
some already running tasks). At any given
-   * time, there should only be up to a maximum of [taskCount] 
actively-reading task groups (tracked in the [taskGroups]
-   * map) + zero or more pending-completion task groups (tracked in 
[pendingCompletionTaskGroups]).
-   */
-  private class TaskGroup
-  {
-    final int groupId;
-
-    // This specifies the partitions and starting offsets for this task group. 
It is set on group creation from the data
-    // in [partitionGroups] and never changes during the lifetime of this task 
group, which will live until a task in
-    // this task group has completed successfully, at which point this will be 
destroyed and a new task group will be
-    // created with new starting offsets. This allows us to create replacement 
tasks for failed tasks that process the
-    // same offsets, even if the values in [partitionGroups] has been changed.
-    final ImmutableMap<Integer, Long> partitionOffsets;
-
-    final ConcurrentHashMap<String, TaskData> tasks = new 
ConcurrentHashMap<>();
-    final Optional<DateTime> minimumMessageTime;
-    final Optional<DateTime> maximumMessageTime;
-    DateTime completionTimeout; // is set after signalTasksToFinish(); if not 
done by timeout, take corrective action
-    final TreeMap<Integer, Map<Integer, Long>> sequenceOffsets = new 
TreeMap<>();
-    final String baseSequenceName;
-
-    TaskGroup(
-        int groupId,
-        ImmutableMap<Integer, Long> partitionOffsets,
-        Optional<DateTime> minimumMessageTime,
-        Optional<DateTime> maximumMessageTime
-    )
-    {
-      this.groupId = groupId;
-      this.partitionOffsets = partitionOffsets;
-      this.minimumMessageTime = minimumMessageTime;
-      this.maximumMessageTime = maximumMessageTime;
-      this.sequenceOffsets.put(0, partitionOffsets);
-      this.baseSequenceName = generateSequenceName(partitionOffsets, 
minimumMessageTime, maximumMessageTime);
-    }
-
-    int addNewCheckpoint(Map<Integer, Long> checkpoint)
-    {
-      sequenceOffsets.put(sequenceOffsets.lastKey() + 1, checkpoint);
-      return sequenceOffsets.lastKey();
-    }
-
-    Set<String> taskIds()
-    {
-      return tasks.keySet();
-    }
-  }
-
-  private static class TaskData
-  {
-    @Nullable
-    volatile TaskStatus status;
-    @Nullable
-    volatile DateTime startTime;
-    volatile Map<Integer, Long> currentOffsets = new HashMap<>();
-
-    @Override
-    public String toString()
-    {
-      return "TaskData{" +
-             "status=" + status +
-             ", startTime=" + startTime +
-             ", currentOffsets=" + currentOffsets +
-             '}';
-    }
-  }
-
-  // Map<{group ID}, {actively reading task group}>; see documentation for 
TaskGroup class
-  private final ConcurrentHashMap<Integer, TaskGroup> taskGroups = new 
ConcurrentHashMap<>();
-
-  // After telling a taskGroup to stop reading and begin publishing a segment, 
it is moved from [taskGroups] to here so
-  // we can monitor its status while we queue new tasks to read the next range 
of offsets. This is a list since we could
-  // have multiple sets of tasks publishing at once if time-to-publish > 
taskDuration.
-  // Map<{group ID}, List<{pending completion task groups}>>
-  private final ConcurrentHashMap<Integer, CopyOnWriteArrayList<TaskGroup>> 
pendingCompletionTaskGroups = new ConcurrentHashMap<>();
-
-  // The starting offset for a new partition in [partitionGroups] is initially 
set to NOT_SET. When a new task group
-  // is created and is assigned partitions, if the offset in [partitionGroups] 
is NOT_SET it will take the starting
-  // offset value from the metadata store, and if it can't find it there, from 
Kafka. Once a task begins
-  // publishing, the offset in partitionGroups will be updated to the ending 
offset of the publishing-but-not-yet-
-  // completed task, which will cause the next set of tasks to begin reading 
from where the previous task left
-  // off. If that previous task now fails, we will set the offset in 
[partitionGroups] back to NOT_SET which will
-  // cause successive tasks to again grab their starting offset from metadata 
store. This mechanism allows us to
-  // start up successive tasks without waiting for the previous tasks to 
succeed and still be able to handle task
-  // failures during publishing.
-  // Map<{group ID}, Map<{partition ID}, {startingOffset}>>
-  private final ConcurrentHashMap<Integer, ConcurrentHashMap<Integer, Long>> 
partitionGroups = new ConcurrentHashMap<>();
-  // --------------------------------------------------------
-
-  private final TaskStorage taskStorage;
-  private final TaskMaster taskMaster;
-  private final IndexerMetadataStorageCoordinator 
indexerMetadataStorageCoordinator;
-  private final KafkaIndexTaskClient taskClient;
-  private final ObjectMapper sortingMapper;
-  private final KafkaSupervisorSpec spec;
-  private final ServiceEmitter emitter;
-  private final DruidMonitorSchedulerConfig monitorSchedulerConfig;
-  private final String dataSource;
-  private final KafkaSupervisorIOConfig ioConfig;
-  private final KafkaSupervisorTuningConfig tuningConfig;
-  private final KafkaTuningConfig taskTuningConfig;
-  private final String supervisorId;
-  private final TaskInfoProvider taskInfoProvider;
-  private final long futureTimeoutInSeconds; // how long to wait for async 
operations to complete
-  private final RowIngestionMetersFactory rowIngestionMetersFactory;
-
-  private final ExecutorService exec;
-  private final ScheduledExecutorService scheduledExec;
-  private final ScheduledExecutorService reportingExec;
-  private final ListeningExecutorService workerExec;
-  private final BlockingQueue<Notice> notices = new LinkedBlockingDeque<>();
-  private final Object stopLock = new Object();
-  private final Object stateChangeLock = new Object();
-  private final Object consumerLock = new Object();
-
-  private boolean listenerRegistered = false;
-  private long lastRunTime;
-
-  private int initRetryCounter = 0;
-
-  private volatile DateTime firstRunTime;
-  private volatile KafkaConsumer consumer;
-
-  private volatile boolean lifecycleStarted = false;
-  private volatile boolean started = false;
-  private volatile boolean stopped = false;
-  private volatile Map<Integer, Long> latestOffsetsFromKafka;
-  private volatile DateTime offsetsLastUpdated;
-
-  public KafkaSupervisor(
-      final TaskStorage taskStorage,
-      final TaskMaster taskMaster,
-      final IndexerMetadataStorageCoordinator 
indexerMetadataStorageCoordinator,
-      final KafkaIndexTaskClientFactory taskClientFactory,
-      final ObjectMapper mapper,
-      final KafkaSupervisorSpec spec,
-      final RowIngestionMetersFactory rowIngestionMetersFactory
-  )
-  {
-    this.taskStorage = taskStorage;
-    this.taskMaster = taskMaster;
-    this.indexerMetadataStorageCoordinator = indexerMetadataStorageCoordinator;
-    this.sortingMapper = 
mapper.copy().configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, true);
-    this.spec = spec;
-    this.emitter = spec.getEmitter();
-    this.monitorSchedulerConfig = spec.getMonitorSchedulerConfig();
-    this.rowIngestionMetersFactory = rowIngestionMetersFactory;
-
-    this.dataSource = spec.getDataSchema().getDataSource();
-    this.ioConfig = spec.getIoConfig();
-    this.tuningConfig = spec.getTuningConfig();
-    this.taskTuningConfig = KafkaTuningConfig.copyOf(this.tuningConfig);
-    this.supervisorId = StringUtils.format("KafkaSupervisor-%s", dataSource);
-    this.exec = Execs.singleThreaded(supervisorId);
-    this.scheduledExec = Execs.scheduledSingleThreaded(supervisorId + 
"-Scheduler-%d");
-    this.reportingExec = Execs.scheduledSingleThreaded(supervisorId + 
"-Reporting-%d");
-
-    int workerThreads = (this.tuningConfig.getWorkerThreads() != null
-                         ? this.tuningConfig.getWorkerThreads()
-                         : Math.min(10, this.ioConfig.getTaskCount()));
-    this.workerExec = 
MoreExecutors.listeningDecorator(Execs.multiThreaded(workerThreads, 
supervisorId + "-Worker-%d"));
-    log.info("Created worker pool with [%d] threads for dataSource [%s]", 
workerThreads, this.dataSource);
-
-    this.taskInfoProvider = new TaskInfoProvider()
-    {
-      @Override
-      public TaskLocation getTaskLocation(final String id)
-      {
-        Preconditions.checkNotNull(id, "id");
-        Optional<TaskRunner> taskRunner = taskMaster.getTaskRunner();
-        if (taskRunner.isPresent()) {
-          Optional<? extends TaskRunnerWorkItem> item = Iterables.tryFind(
-              taskRunner.get().getRunningTasks(),
-              (Predicate<TaskRunnerWorkItem>) taskRunnerWorkItem -> 
id.equals(taskRunnerWorkItem.getTaskId())
-          );
-
-          if (item.isPresent()) {
-            return item.get().getLocation();
-          }
-        } else {
-          log.error("Failed to get task runner because I'm not the leader!");
-        }
-
-        return TaskLocation.unknown();
-      }
-
-      @Override
-      public Optional<TaskStatus> getTaskStatus(String id)
-      {
-        return taskStorage.getStatus(id);
-      }
-    };
-
-    this.futureTimeoutInSeconds = Math.max(
-        MINIMUM_FUTURE_TIMEOUT_IN_SECONDS,
-        tuningConfig.getChatRetries() * 
(tuningConfig.getHttpTimeout().getStandardSeconds()
-                                         + 
IndexTaskClient.MAX_RETRY_WAIT_SECONDS)
-    );
-
-    int chatThreads = (this.tuningConfig.getChatThreads() != null
-                       ? this.tuningConfig.getChatThreads()
-                       : Math.min(10, this.ioConfig.getTaskCount() * 
this.ioConfig.getReplicas()));
-    this.taskClient = taskClientFactory.build(
-        taskInfoProvider,
-        dataSource,
-        chatThreads,
-        this.tuningConfig.getHttpTimeout(),
-        this.tuningConfig.getChatRetries()
-    );
-    log.info(
-        "Created taskClient with dataSource[%s] chatThreads[%d] 
httpTimeout[%s] chatRetries[%d]",
-        dataSource,
-        chatThreads,
-        this.tuningConfig.getHttpTimeout(),
-        this.tuningConfig.getChatRetries()
-    );
-  }
-
-  @Override
-  public void start()
-  {
-    synchronized (stateChangeLock) {
-      Preconditions.checkState(!lifecycleStarted, "already started");
-      Preconditions.checkState(!exec.isShutdown(), "already stopped");
-
-      // Try normal initialization first, if that fails then schedule periodic 
initialization retries
-      try {
-        tryInit();
-      }
-      catch (Exception e) {
-        if (!started) {
-          log.warn("First initialization attempt failed for 
KafkaSupervisor[%s], starting retries...", dataSource);
-
-          exec.submit(
-              () -> {
-                try {
-                  RetryUtils.retry(
-                      () -> {
-                        tryInit();
-                        return 0;
-                      },
-                      (throwable) -> {
-                        return !started;
-                      },
-                      0,
-                      MAX_INITIALIZATION_RETRIES,
-                      null,
-                      null
-                  );
-                }
-                catch (Exception e2) {
-                  log.makeAlert(
-                      "Failed to initialize after %s retries, aborting. Please 
resubmit the supervisor spec to restart this supervisor [%s]",
-                      MAX_INITIALIZATION_RETRIES,
-                      supervisorId
-                  ).emit();
-                  throw new RuntimeException(e2);
-                }
-              }
-          );
-        }
-      }
-
-      lifecycleStarted = true;
-    }
-  }
-
-  @Override
-  public void stop(boolean stopGracefully)
-  {
-    synchronized (stateChangeLock) {
-      Preconditions.checkState(lifecycleStarted, "lifecycle not started");
-
-      log.info("Beginning shutdown of KafkaSupervisor[%s]", dataSource);
-
-      try {
-        scheduledExec.shutdownNow(); // stop recurring executions
-        reportingExec.shutdownNow();
-
-        if (started) {
-          Optional<TaskRunner> taskRunner = taskMaster.getTaskRunner();
-          if (taskRunner.isPresent()) {
-            taskRunner.get().unregisterListener(supervisorId);
-          }
-
-          // Stopping gracefully will synchronize the end offsets of the tasks 
and signal them to publish, and will block
-          // until the tasks have acknowledged or timed out. We want this 
behavior when we're explicitly shut down through
-          // the API, but if we shut down for other reasons (e.g. we lose 
leadership) we want to just stop and leave the
-          // tasks as they are.
-          synchronized (stopLock) {
-            if (stopGracefully) {
-              log.info("Posting GracefulShutdownNotice, signalling managed 
tasks to complete and publish");
-              notices.add(new GracefulShutdownNotice());
-            } else {
-              log.info("Posting ShutdownNotice");
-              notices.add(new ShutdownNotice());
-            }
-
-            long shutdownTimeoutMillis = 
tuningConfig.getShutdownTimeout().getMillis();
-            long endTime = System.currentTimeMillis() + shutdownTimeoutMillis;
-            while (!stopped) {
-              long sleepTime = endTime - System.currentTimeMillis();
-              if (sleepTime <= 0) {
-                log.info("Timed out while waiting for shutdown (timeout 
[%,dms])", shutdownTimeoutMillis);
-                stopped = true;
-                break;
-              }
-              stopLock.wait(sleepTime);
-            }
-          }
-          log.info("Shutdown notice handled");
-        }
-
-        taskClient.close();
-        workerExec.shutdownNow();
-        exec.shutdownNow();
-        started = false;
-
-        log.info("KafkaSupervisor[%s] has stopped", dataSource);
-      }
-      catch (Exception e) {
-        log.makeAlert(e, "Exception stopping KafkaSupervisor[%s]", dataSource)
-           .emit();
-      }
-    }
-  }
-
-  private boolean someTaskGroupsPendingCompletion(Integer groupId)
-  {
-    CopyOnWriteArrayList<TaskGroup> taskGroups = 
pendingCompletionTaskGroups.get(groupId);
-    return taskGroups != null && taskGroups.size() > 0;
-  }
-
-  @Override
-  public SupervisorReport getStatus()
-  {
-    return generateReport(true);
-  }
-
-  @Override
-  public Map<String, Map<String, Object>> getStats()
-  {
-    try {
-      return getCurrentTotalStats();
-    }
-    catch (InterruptedException ie) {
-      Thread.currentThread().interrupt();
-      log.error(ie, "getStats() interrupted.");
-      throw new RuntimeException(ie);
-    }
-    catch (ExecutionException | TimeoutException eete) {
-      throw new RuntimeException(eete);
-    }
-  }
-
-  @Override
-  public void reset(DataSourceMetadata dataSourceMetadata)
-  {
-    log.info("Posting ResetNotice");
-    notices.add(new ResetNotice(dataSourceMetadata));
-  }
-
-  @Override
-  public void checkpoint(
-      @Nullable Integer taskGroupId,
-      @Deprecated String baseSequenceName,
-      DataSourceMetadata previousCheckPoint,
-      DataSourceMetadata currentCheckPoint
-  )
-  {
-    Preconditions.checkNotNull(previousCheckPoint, "previousCheckpoint");
-    Preconditions.checkNotNull(currentCheckPoint, "current checkpoint cannot 
be null");
-    Preconditions.checkArgument(
-        ioConfig.getTopic().equals(((KafkaDataSourceMetadata) 
currentCheckPoint).getKafkaPartitions().getTopic()),
-        "Supervisor topic [%s] and topic in checkpoint [%s] does not match",
-        ioConfig.getTopic(),
-        ((KafkaDataSourceMetadata) 
currentCheckPoint).getKafkaPartitions().getTopic()
-    );
-
-    log.info("Checkpointing [%s] for taskGroup [%s]", currentCheckPoint, 
taskGroupId);
-    notices.add(
-        new CheckpointNotice(
-            taskGroupId,
-            baseSequenceName,
-            (KafkaDataSourceMetadata) previousCheckPoint,
-            (KafkaDataSourceMetadata) currentCheckPoint
-        )
-    );
-  }
-
-  public void possiblyRegisterListener()
-  {
-    // getTaskRunner() sometimes fails if the task queue is still being 
initialized so retry later until we succeed
-
-    if (listenerRegistered) {
-      return;
-    }
-
-    Optional<TaskRunner> taskRunner = taskMaster.getTaskRunner();
-    if (taskRunner.isPresent()) {
-      taskRunner.get().registerListener(
-          new TaskRunnerListener()
-          {
-            @Override
-            public String getListenerId()
-            {
-              return supervisorId;
-            }
-
-            @Override
-            public void locationChanged(final String taskId, final 
TaskLocation newLocation)
-            {
-              // do nothing
-            }
-
-            @Override
-            public void statusChanged(String taskId, TaskStatus status)
-            {
-              notices.add(new RunNotice());
-            }
-          }, MoreExecutors.sameThreadExecutor()
-      );
-
-      listenerRegistered = true;
-    }
-  }
-
-  private interface Notice
-  {
-    void handle() throws ExecutionException, InterruptedException, 
TimeoutException, JsonProcessingException;
-  }
-
-  private class RunNotice implements Notice
-  {
-    @Override
-    public void handle() throws ExecutionException, InterruptedException, 
TimeoutException, JsonProcessingException
-    {
-      long nowTime = System.currentTimeMillis();
-      if (nowTime - lastRunTime < MAX_RUN_FREQUENCY_MILLIS) {
-        return;
-      }
-      lastRunTime = nowTime;
-
-      runInternal();
-    }
-  }
-
-  private class GracefulShutdownNotice extends ShutdownNotice
-  {
-    @Override
-    public void handle() throws InterruptedException, ExecutionException, 
TimeoutException
-    {
-      gracefulShutdownInternal();
-      super.handle();
-    }
-  }
-
-  private class ShutdownNotice implements Notice
-  {
-    @Override
-    public void handle() throws InterruptedException, ExecutionException, 
TimeoutException
-    {
-      consumer.close();
-
-      synchronized (stopLock) {
-        stopped = true;
-        stopLock.notifyAll();
-      }
-    }
-  }
-
-  private class ResetNotice implements Notice
-  {
-    final DataSourceMetadata dataSourceMetadata;
-
-    ResetNotice(DataSourceMetadata dataSourceMetadata)
-    {
-      this.dataSourceMetadata = dataSourceMetadata;
-    }
-
-    @Override
-    public void handle()
-    {
-      resetInternal(dataSourceMetadata);
-    }
-  }
-
-  private class CheckpointNotice implements Notice
-  {
-    @Nullable
-    private final Integer nullableTaskGroupId;
-    @Deprecated
-    private final String baseSequenceName;
-    private final KafkaDataSourceMetadata previousCheckpoint;
-    private final KafkaDataSourceMetadata currentCheckpoint;
-
-    CheckpointNotice(
-        @Nullable Integer nullableTaskGroupId,
-        @Deprecated String baseSequenceName,
-        KafkaDataSourceMetadata previousCheckpoint,
-        KafkaDataSourceMetadata currentCheckpoint
-    )
-    {
-      this.baseSequenceName = baseSequenceName;
-      this.nullableTaskGroupId = nullableTaskGroupId;
-      this.previousCheckpoint = previousCheckpoint;
-      this.currentCheckpoint = currentCheckpoint;
-    }
-
-    @Override
-    public void handle() throws ExecutionException, InterruptedException
-    {
-      // Find taskGroupId using taskId if it's null. It can be null while 
rolling update.
-      final int taskGroupId;
-      if (nullableTaskGroupId == null) {
-        // We search taskId in taskGroups and pendingCompletionTaskGroups 
sequentially. This should be fine because
-        // 1) a taskGroup can be moved from taskGroups to 
pendingCompletionTaskGroups in RunNotice
-        //    (see checkTaskDuration()).
-        // 2) Notices are proceesed by a single thread. So, CheckpointNotice 
and RunNotice cannot be processed at the
-        //    same time.
-        final java.util.Optional<Integer> maybeGroupId = taskGroups
-            .entrySet()
-            .stream()
-            .filter(entry -> {
-              final TaskGroup taskGroup = entry.getValue();
-              return taskGroup.baseSequenceName.equals(baseSequenceName);
-            })
-            .findAny()
-            .map(Entry::getKey);
-
-        if (maybeGroupId.isPresent()) {
-          taskGroupId = maybeGroupId.get();
-        } else {
-          taskGroupId = pendingCompletionTaskGroups
-              .entrySet()
-              .stream()
-              .filter(entry -> {
-                final List<TaskGroup> taskGroups = entry.getValue();
-                return taskGroups.stream().anyMatch(group -> 
group.baseSequenceName.equals(baseSequenceName));
-              })
-              .findAny()
-              .orElseThrow(() -> new ISE("Cannot find taskGroup for 
baseSequenceName[%s]", baseSequenceName))
-              .getKey();
-        }
-      } else {
-        taskGroupId = nullableTaskGroupId;
-      }
-
-      // check for consistency
-      // if already received request for this sequenceName and 
dataSourceMetadata combination then return
-      final TaskGroup taskGroup = taskGroups.get(taskGroupId);
-
-      if (isValidTaskGroup(taskGroupId, taskGroup)) {
-        final TreeMap<Integer, Map<Integer, Long>> checkpoints = 
taskGroup.sequenceOffsets;
-
-        // check validity of previousCheckpoint
-        int index = checkpoints.size();
-        for (int sequenceId : checkpoints.descendingKeySet()) {
-          Map<Integer, Long> checkpoint = checkpoints.get(sequenceId);
-          // We have already verified the topic of the current checkpoint is 
same with that in ioConfig.
-          // See checkpoint().
-          if 
(checkpoint.equals(previousCheckpoint.getKafkaPartitions().getPartitionOffsetMap()))
 {
-            break;
-          }
-          index--;
-        }
-        if (index == 0) {
-          throw new ISE("No such previous checkpoint [%s] found", 
previousCheckpoint);
-        } else if (index < checkpoints.size()) {
-          // if the found checkpoint is not the latest one then already 
checkpointed by a replica
-          Preconditions.checkState(index == checkpoints.size() - 1, 
"checkpoint consistency failure");
-          log.info("Already checkpointed with offsets [%s]", 
checkpoints.lastEntry().getValue());
-          return;
-        }
-        final Map<Integer, Long> newCheckpoint = 
checkpointTaskGroup(taskGroup, false).get();
-        taskGroup.addNewCheckpoint(newCheckpoint);
-        log.info("Handled checkpoint notice, new checkpoint is [%s] for 
taskGroup [%s]", newCheckpoint, taskGroupId);
-      }
-    }
-
-    private boolean isValidTaskGroup(int taskGroupId, @Nullable TaskGroup 
taskGroup)
-    {
-      if (taskGroup == null) {
-        // taskGroup might be in pendingCompletionTaskGroups or partitionGroups
-        if (pendingCompletionTaskGroups.containsKey(taskGroupId)) {
-          log.warn(
-              "Ignoring checkpoint request because taskGroup[%d] has already 
stopped indexing and is waiting for "
-              + "publishing segments",
-              taskGroupId
-          );
-          return false;
-        } else if (partitionGroups.containsKey(taskGroupId)) {
-          log.warn("Ignoring checkpoint request because taskGroup[%d] is 
inactive", taskGroupId);
-          return false;
-        } else {
-          throw new ISE("WTH?! cannot find taskGroup [%s] among all taskGroups 
[%s]", taskGroupId, taskGroups);
-        }
-      }
-
-      return true;
-    }
-  }
-
-  @VisibleForTesting
-  void resetInternal(DataSourceMetadata dataSourceMetadata)
-  {
-    if (dataSourceMetadata == null) {
-      // Reset everything
-      boolean result = 
indexerMetadataStorageCoordinator.deleteDataSourceMetadata(dataSource);
-      log.info("Reset dataSource[%s] - dataSource metadata entry deleted? 
[%s]", dataSource, result);
-      taskGroups.values().forEach(group -> killTasksInGroup(group, 
"DataSourceMetadata is not found while reset"));
-      taskGroups.clear();
-      partitionGroups.clear();
-    } else if (!(dataSourceMetadata instanceof KafkaDataSourceMetadata)) {
-      throw new IAE("Expected KafkaDataSourceMetadata but found instance of 
[%s]", dataSourceMetadata.getClass());
-    } else {
-      // Reset only the partitions in dataSourceMetadata if it has not been 
reset yet
-      final KafkaDataSourceMetadata resetKafkaMetadata = 
(KafkaDataSourceMetadata) dataSourceMetadata;
-
-      if 
(resetKafkaMetadata.getKafkaPartitions().getTopic().equals(ioConfig.getTopic()))
 {
-        // metadata can be null
-        final DataSourceMetadata metadata = 
indexerMetadataStorageCoordinator.getDataSourceMetadata(dataSource);
-        if (metadata != null && !(metadata instanceof 
KafkaDataSourceMetadata)) {
-          throw new IAE(
-              "Expected KafkaDataSourceMetadata from metadata store but found 
instance of [%s]",
-              metadata.getClass()
-          );
-        }
-        final KafkaDataSourceMetadata currentMetadata = 
(KafkaDataSourceMetadata) metadata;
-
-        // defend against consecutive reset requests from replicas
-        // as well as the case where the metadata store do not have an entry 
for the reset partitions
-        boolean doReset = false;
-        for (Entry<Integer, Long> resetPartitionOffset : 
resetKafkaMetadata.getKafkaPartitions()
-                                                                           
.getPartitionOffsetMap()
-                                                                           
.entrySet()) {
-          final Long partitionOffsetInMetadataStore = currentMetadata == null
-                                                      ? null
-                                                      : 
currentMetadata.getKafkaPartitions()
-                                                                       
.getPartitionOffsetMap()
-                                                                       
.get(resetPartitionOffset.getKey());
-          final TaskGroup partitionTaskGroup = taskGroups.get(
-              getTaskGroupIdForPartition(resetPartitionOffset.getKey())
-          );
-          final boolean isSameOffset = partitionTaskGroup != null
-                                       && 
partitionTaskGroup.partitionOffsets.get(resetPartitionOffset.getKey())
-                                                                             
.equals(resetPartitionOffset.getValue());
-          if (partitionOffsetInMetadataStore != null || isSameOffset) {
-            doReset = true;
-            break;
-          }
-        }
-
-        if (!doReset) {
-          log.info("Ignoring duplicate reset request [%s]", 
dataSourceMetadata);
-          return;
-        }
-
-        boolean metadataUpdateSuccess = false;
-        if (currentMetadata == null) {
-          metadataUpdateSuccess = true;
-        } else {
-          final DataSourceMetadata newMetadata = 
currentMetadata.minus(resetKafkaMetadata);
-          try {
-            metadataUpdateSuccess = 
indexerMetadataStorageCoordinator.resetDataSourceMetadata(dataSource, 
newMetadata);
-          }
-          catch (IOException e) {
-            log.error("Resetting DataSourceMetadata failed [%s]", 
e.getMessage());
-            Throwables.propagate(e);
-          }
-        }
-        if (metadataUpdateSuccess) {
-          
resetKafkaMetadata.getKafkaPartitions().getPartitionOffsetMap().keySet().forEach(partition
 -> {
-            final int groupId = getTaskGroupIdForPartition(partition);
-            killTaskGroupForPartitions(ImmutableSet.of(partition), 
"DataSourceMetadata is updated while reset");
-            taskGroups.remove(groupId);
-            partitionGroups.get(groupId).replaceAll((partitionId, offset) -> 
NOT_SET);
-          });
-        } else {
-          throw new ISE("Unable to reset metadata");
-        }
-      } else {
-        log.warn(
-            "Reset metadata topic [%s] and supervisor's topic [%s] do not 
match",
-            resetKafkaMetadata.getKafkaPartitions().getTopic(),
-            ioConfig.getTopic()
-        );
-      }
-    }
-  }
-
-  private void killTaskGroupForPartitions(Set<Integer> partitions, String 
reasonFormat, Object... args)
-  {
-    for (Integer partition : partitions) {
-      killTasksInGroup(taskGroups.get(getTaskGroupIdForPartition(partition)), 
reasonFormat, args);
-    }
-  }
-
-  private void killTasksInGroup(TaskGroup taskGroup, String reasonFormat, 
Object... args)
-  {
-    if (taskGroup != null) {
-      for (String taskId : taskGroup.tasks.keySet()) {
-        killTask(taskId, reasonFormat, args);
-      }
-    }
-  }
-
-  @VisibleForTesting
-  void gracefulShutdownInternal() throws ExecutionException, 
InterruptedException, TimeoutException
-  {
-    // Prepare for shutdown by 1) killing all tasks that haven't been assigned 
to a worker yet, and 2) causing all
-    // running tasks to begin publishing by setting their startTime to a very 
long time ago so that the logic in
-    // checkTaskDuration() will be triggered. This is better than just telling 
these tasks to publish whatever they
-    // have, as replicas that are supposed to publish the same segment may not 
have read the same set of offsets.
-    for (TaskGroup taskGroup : taskGroups.values()) {
-      for (Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
-        if 
(taskInfoProvider.getTaskLocation(entry.getKey()).equals(TaskLocation.unknown()))
 {
-          killTask(entry.getKey(), "Killing task for graceful shutdown");
-        } else {
-          entry.getValue().startTime = DateTimes.EPOCH;
-        }
-      }
-    }
-
-    checkTaskDuration();
-  }
-
-  @VisibleForTesting
-  void runInternal() throws ExecutionException, InterruptedException, 
TimeoutException, JsonProcessingException
-  {
-    possiblyRegisterListener();
-    updatePartitionDataFromKafka();
-    discoverTasks();
-    updateTaskStatus();
-    checkTaskDuration();
-    checkPendingCompletionTasks();
-    checkCurrentTaskState();
-
-    // if supervisor is not suspended, ensure required tasks are running
-    // if suspended, ensure tasks have been requested to gracefully stop
-    if (!spec.isSuspended()) {
-      log.info("[%s] supervisor is running.", dataSource);
-      createNewTasks();
-    } else {
-      log.info("[%s] supervisor is suspended.", dataSource);
-      gracefulShutdownInternal();
-    }
-
-    if (log.isDebugEnabled()) {
-      log.debug(generateReport(true).toString());
-    } else {
-      log.info(generateReport(false).toString());
-    }
-  }
-
-  String generateSequenceName(
-      Map<Integer, Long> startPartitions,
-      Optional<DateTime> minimumMessageTime,
-      Optional<DateTime> maximumMessageTime
-  )
-  {
-    StringBuilder sb = new StringBuilder();
-
-    for (Entry<Integer, Long> entry : startPartitions.entrySet()) {
-      sb.append(StringUtils.format("+%d(%d)", entry.getKey(), 
entry.getValue()));
-    }
-    String partitionOffsetStr = sb.toString().substring(1);
-
-    String minMsgTimeStr = (minimumMessageTime.isPresent() ? 
String.valueOf(minimumMessageTime.get().getMillis()) : "");
-    String maxMsgTimeStr = (maximumMessageTime.isPresent() ? 
String.valueOf(maximumMessageTime.get().getMillis()) : "");
-
-    String dataSchema, tuningConfig;
-    try {
-      dataSchema = sortingMapper.writeValueAsString(spec.getDataSchema());
-      tuningConfig = sortingMapper.writeValueAsString(taskTuningConfig);
-    }
-    catch (JsonProcessingException e) {
-      throw Throwables.propagate(e);
-    }
-
-    String hashCode = DigestUtils.sha1Hex(dataSchema
-                                          + tuningConfig
-                                          + partitionOffsetStr
-                                          + minMsgTimeStr
-                                          + maxMsgTimeStr)
-                                 .substring(0, 15);
-
-    return Joiner.on("_").join("index_kafka", dataSource, hashCode);
-  }
-
-  @VisibleForTesting
-  protected void tryInit()
-  {
-    synchronized (stateChangeLock) {
-      if (started) {
-        log.warn("SUpervisor was already started, skipping init");
-        return;
-      }
-
-      if (stopped) {
-        log.warn("Supervisor was already stopped, skipping init.");
-        return;
-      }
-
-      try {
-        consumer = getKafkaConsumer();
-
-        exec.submit(
-            () -> {
-              try {
-                long pollTimeout = Math.max(ioConfig.getPeriod().getMillis(), 
MAX_RUN_FREQUENCY_MILLIS);
-                while (!Thread.currentThread().isInterrupted() && !stopped) {
-                  final Notice notice = notices.poll(pollTimeout, 
TimeUnit.MILLISECONDS);
-                  if (notice == null) {
-                    continue;
-                  }
-
-                  try {
-                    notice.handle();
-                  }
-                  catch (Throwable e) {
-                    log.makeAlert(e, "KafkaSupervisor[%s] failed to handle 
notice", dataSource)
-                       .addData("noticeClass", 
notice.getClass().getSimpleName())
-                       .emit();
-                  }
-                }
-              }
-              catch (InterruptedException e) {
-                log.info("KafkaSupervisor[%s] interrupted, exiting", 
dataSource);
-              }
-            }
-        );
-        firstRunTime = DateTimes.nowUtc().plus(ioConfig.getStartDelay());
-        scheduledExec.scheduleAtFixedRate(
-            buildRunTask(),
-            ioConfig.getStartDelay().getMillis(),
-            Math.max(ioConfig.getPeriod().getMillis(), 
MAX_RUN_FREQUENCY_MILLIS),
-            TimeUnit.MILLISECONDS
-        );
-
-        reportingExec.scheduleAtFixedRate(
-            updateCurrentAndLatestOffsets(),
-            ioConfig.getStartDelay().getMillis() + 
INITIAL_GET_OFFSET_DELAY_MILLIS, // wait for tasks to start up
-            Math.max(
-                tuningConfig.getOffsetFetchPeriod().getMillis(), 
MINIMUM_GET_OFFSET_PERIOD_MILLIS
-            ),
-            TimeUnit.MILLISECONDS
-        );
-
-        reportingExec.scheduleAtFixedRate(
-            emitLag(),
-            ioConfig.getStartDelay().getMillis() + 
INITIAL_EMIT_LAG_METRIC_DELAY_MILLIS, // wait for tasks to start up
-            monitorSchedulerConfig.getEmitterPeriod().getMillis(),
-            TimeUnit.MILLISECONDS
-        );
-
-        started = true;
-        log.info(
-            "Started KafkaSupervisor[%s], first run in [%s], with spec: [%s]",
-            dataSource,
-            ioConfig.getStartDelay(),
-            spec.toString()
-        );
-      }
-      catch (Exception e) {
-        if (consumer != null) {
-          consumer.close();
-        }
-        initRetryCounter++;
-        log.makeAlert(e, "Exception starting KafkaSupervisor[%s]", dataSource)
-           .emit();
-
-        throw new RuntimeException(e);
-      }
-    }
-  }
-
-  private KafkaConsumer<byte[], byte[]> getKafkaConsumer()
-  {
-    final Properties props = new Properties();
-
-    props.setProperty("metadata.max.age.ms", "10000");
-    props.setProperty("group.id", StringUtils.format("kafka-supervisor-%s", 
RealtimeIndexTask.makeRandomId()));
-
-    KafkaIndexTask.addConsumerPropertiesFromConfig(props, sortingMapper, 
ioConfig.getConsumerProperties());
-
-    props.setProperty("enable.auto.commit", "false");
-
-    ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
-    try {
-      
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
-      return new KafkaConsumer<>(props, new ByteArrayDeserializer(), new 
ByteArrayDeserializer());
-    }
-    finally {
-      Thread.currentThread().setContextClassLoader(currCtxCl);
-    }
-  }
-
-  private void updatePartitionDataFromKafka()
-  {
-    List<PartitionInfo> partitions;
-    try {
-      synchronized (consumerLock) {
-        partitions = consumer.partitionsFor(ioConfig.getTopic());
-      }
-    }
-    catch (Exception e) {
-      log.warn(
-          e,
-          "Unable to get partition data from Kafka for brokers [%s], are the 
brokers up?",
-          
ioConfig.getConsumerProperties().get(KafkaSupervisorIOConfig.BOOTSTRAP_SERVERS_KEY)
-      );
-      return;
-    }
-
-    int numPartitions = (partitions != null ? partitions.size() : 0);
-
-    log.debug("Found [%d] Kafka partitions for topic [%s]", numPartitions, 
ioConfig.getTopic());
-
-    for (int partition = 0; partition < numPartitions; partition++) {
-      int taskGroupId = getTaskGroupIdForPartition(partition);
-
-      ConcurrentHashMap<Integer, Long> partitionMap = 
partitionGroups.computeIfAbsent(
-          taskGroupId,
-          k -> new ConcurrentHashMap<>()
-      );
-
-      // The starting offset for a new partition in [partitionGroups] is 
initially set to NOT_SET; when a new task group
-      // is created and is assigned partitions, if the offset in 
[partitionGroups] is NOT_SET it will take the starting
-      // offset value from the metadata store, and if it can't find it there, 
from Kafka. Once a task begins
-      // publishing, the offset in partitionGroups will be updated to the 
ending offset of the publishing-but-not-yet-
-      // completed task, which will cause the next set of tasks to begin 
reading from where the previous task left
-      // off. If that previous task now fails, we will set the offset in 
[partitionGroups] back to NOT_SET which will
-      // cause successive tasks to again grab their starting offset from 
metadata store. This mechanism allows us to
-      // start up successive tasks without waiting for the previous tasks to 
succeed and still be able to handle task
-      // failures during publishing.
-      if (partitionMap.putIfAbsent(partition, NOT_SET) == null) {
-        log.info(
-            "New partition [%d] discovered for topic [%s], added to task group 
[%d]",
-            partition,
-            ioConfig.getTopic(),
-            taskGroupId
-        );
-      }
-    }
-  }
-
-  private void discoverTasks() throws ExecutionException, 
InterruptedException, TimeoutException
-  {
-    int taskCount = 0;
-    List<String> futureTaskIds = new ArrayList<>();
-    List<ListenableFuture<Boolean>> futures = new ArrayList<>();
-    List<Task> tasks = taskStorage.getActiveTasks();
-    final Map<Integer, TaskGroup> taskGroupsToVerify = new HashMap<>();
-
-    for (Task task : tasks) {
-      if (!(task instanceof KafkaIndexTask) || 
!dataSource.equals(task.getDataSource())) {
-        continue;
-      }
-
-      taskCount++;
-      final KafkaIndexTask kafkaTask = (KafkaIndexTask) task;
-      final String taskId = task.getId();
-
-      // Determine which task group this task belongs to based on one of the 
partitions handled by this task. If we
-      // later determine that this task is actively reading, we will make sure 
that it matches our current partition
-      // allocation (getTaskGroupIdForPartition(partition) should return the 
same value for every partition being read
-      // by this task) and kill it if it is not compatible. If the task is 
instead found to be in the publishing
-      // state, we will permit it to complete even if it doesn't match our 
current partition allocation to support
-      // seamless schema migration.
-
-      Iterator<Integer> it = 
kafkaTask.getIOConfig().getStartPartitions().getPartitionOffsetMap().keySet().iterator();
-      final Integer taskGroupId = (it.hasNext() ? 
getTaskGroupIdForPartition(it.next()) : null);
-
-      if (taskGroupId != null) {
-        // check to see if we already know about this task, either in 
[taskGroups] or in [pendingCompletionTaskGroups]
-        // and if not add it to taskGroups or pendingCompletionTaskGroups (if 
status = PUBLISHING)
-        TaskGroup taskGroup = taskGroups.get(taskGroupId);
-        if (!isTaskInPendingCompletionGroups(taskId) && (taskGroup == null || 
!taskGroup.tasks.containsKey(taskId))) {
-
-          futureTaskIds.add(taskId);
-          futures.add(
-              Futures.transform(
-                  taskClient.getStatusAsync(taskId), new 
Function<KafkaIndexTask.Status, Boolean>()
-                  {
-                    @Override
-                    public Boolean apply(KafkaIndexTask.Status status)
-                    {
-                      try {
-                        log.debug("Task [%s], status [%s]", taskId, status);
-                        if (status == KafkaIndexTask.Status.PUBLISHING) {
-                          
kafkaTask.getIOConfig().getStartPartitions().getPartitionOffsetMap().keySet().forEach(
-                              partition -> 
addDiscoveredTaskToPendingCompletionTaskGroups(
-                                  getTaskGroupIdForPartition(partition),
-                                  taskId,
-                                  kafkaTask.getIOConfig()
-                                           .getStartPartitions()
-                                           .getPartitionOffsetMap()
-                              )
-                          );
-
-                          // update partitionGroups with the publishing task's 
offsets (if they are greater than what is
-                          // existing) so that the next tasks will start 
reading from where this task left off
-                          Map<Integer, Long> publishingTaskEndOffsets = 
taskClient.getEndOffsets(taskId);
-
-                          for (Entry<Integer, Long> entry : 
publishingTaskEndOffsets.entrySet()) {
-                            Integer partition = entry.getKey();
-                            Long offset = entry.getValue();
-                            ConcurrentHashMap<Integer, Long> partitionOffsets 
= partitionGroups.get(
-                                getTaskGroupIdForPartition(partition)
-                            );
-
-                            boolean succeeded;
-                            do {
-                              succeeded = true;
-                              Long previousOffset = 
partitionOffsets.putIfAbsent(partition, offset);
-                              if (previousOffset != null && previousOffset < 
offset) {
-                                succeeded = 
partitionOffsets.replace(partition, previousOffset, offset);
-                              }
-                            } while (!succeeded);
-                          }
-                        } else {
-                          for (Integer partition : kafkaTask.getIOConfig()
-                                                            
.getStartPartitions()
-                                                            
.getPartitionOffsetMap()
-                                                            .keySet()) {
-                            if 
(!taskGroupId.equals(getTaskGroupIdForPartition(partition))) {
-                              log.warn(
-                                  "Stopping task [%s] which does not match the 
expected partition allocation",
-                                  taskId
-                              );
-                              try {
-                                stopTask(taskId, 
false).get(futureTimeoutInSeconds, TimeUnit.SECONDS);
-                              }
-                              catch (InterruptedException | ExecutionException 
| TimeoutException e) {
-                                log.warn(e, "Exception while stopping task");
-                              }
-                              return false;
-                            }
-                          }
-                          // make sure the task's io and tuning configs match 
with the supervisor config
-                          // if it is current then only create corresponding 
taskGroup if it does not exist
-                          if (!isTaskCurrent(taskGroupId, taskId)) {
-                            log.info(
-                                "Stopping task [%s] which does not match the 
expected parameters and ingestion spec",
-                                taskId
-                            );
-                            try {
-                              stopTask(taskId, 
false).get(futureTimeoutInSeconds, TimeUnit.SECONDS);
-                            }
-                            catch (InterruptedException | ExecutionException | 
TimeoutException e) {
-                              log.warn(e, "Exception while stopping task");
-                            }
-                            return false;
-                          } else {
-                            final TaskGroup taskGroup = 
taskGroups.computeIfAbsent(
-                                taskGroupId,
-                                k -> {
-                                  log.info("Creating a new task group for 
taskGroupId[%d]", taskGroupId);
-                                  return new TaskGroup(
-                                      taskGroupId,
-                                      ImmutableMap.copyOf(
-                                          
kafkaTask.getIOConfig().getStartPartitions().getPartitionOffsetMap()
-                                      ),
-                                      
kafkaTask.getIOConfig().getMinimumMessageTime(),
-                                      
kafkaTask.getIOConfig().getMaximumMessageTime()
-                                  );
-                                }
-                            );
-                            taskGroupsToVerify.put(taskGroupId, taskGroup);
-                            final TaskData prevTaskData = 
taskGroup.tasks.putIfAbsent(taskId, new TaskData());
-                            if (prevTaskData != null) {
-                              throw new ISE(
-                                  "WTH? a taskData[%s] already exists for new 
task[%s]",
-                                  prevTaskData,
-                                  taskId
-                              );
-                            }
-                          }
-                        }
-                        return true;
-                      }
-                      catch (Throwable t) {
-                        log.error(t, "Something bad while discovering task 
[%s]", taskId);
-                        return null;
-                      }
-                    }
-                  }, workerExec
-              )
-          );
-        }
-      }
-    }
-
-    List<Boolean> results = 
Futures.successfulAsList(futures).get(futureTimeoutInSeconds, TimeUnit.SECONDS);
-    for (int i = 0; i < results.size(); i++) {
-      if (results.get(i) == null) {
-        String taskId = futureTaskIds.get(i);
-        killTask(taskId, "Task [%s] failed to return status, killing task", 
taskId);
-      }
-    }
-    log.debug("Found [%d] Kafka indexing tasks for dataSource [%s]", 
taskCount, dataSource);
-
-    // make sure the checkpoints are consistent with each other and with the 
metadata store
-    verifyAndMergeCheckpoints(taskGroupsToVerify.values());
-  }
-
-  private void verifyAndMergeCheckpoints(final Collection<TaskGroup> 
taskGroupsToVerify)
-  {
-    final List<ListenableFuture<?>> futures = new ArrayList<>();
-    for (TaskGroup taskGroup : taskGroupsToVerify) {
-      futures.add(workerExec.submit(() -> 
verifyAndMergeCheckpoints(taskGroup)));
-    }
-    try {
-      Futures.allAsList(futures).get(futureTimeoutInSeconds, TimeUnit.SECONDS);
-    }
-    catch (InterruptedException | ExecutionException | TimeoutException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  /**
-   * This method does two things -
-   * 1. Makes sure the checkpoints information in the taskGroup is consistent 
with that of the tasks, if not kill
-   * inconsistent tasks.
-   * 2. truncates the checkpoints in the taskGroup corresponding to which 
segments have been published, so that any newly
-   * created tasks for the taskGroup start indexing from after the latest 
published offsets.
-   */
-  private void verifyAndMergeCheckpoints(final TaskGroup taskGroup)
-  {
-    final int groupId = taskGroup.groupId;
-    final List<Pair<String, TreeMap<Integer, Map<Integer, Long>>>> 
taskSequences = new ArrayList<>();
-    final List<ListenableFuture<TreeMap<Integer, Map<Integer, Long>>>> futures 
= new ArrayList<>();
-    final List<String> taskIds = new ArrayList<>();
-
-    for (String taskId : taskGroup.taskIds()) {
-      final ListenableFuture<TreeMap<Integer, Map<Integer, Long>>> 
checkpointsFuture = taskClient.getCheckpointsAsync(
-          taskId,
-          true
-      );
-      taskIds.add(taskId);
-      futures.add(checkpointsFuture);
-    }
-
-    try {
-      List<TreeMap<Integer, Map<Integer, Long>>> futuresResult = 
Futures.successfulAsList(futures)
-                                                                        
.get(futureTimeoutInSeconds, TimeUnit.SECONDS);
-
-      for (int i = 0; i < futuresResult.size(); i++) {
-        final TreeMap<Integer, Map<Integer, Long>> checkpoints = 
futuresResult.get(i);
-        final String taskId = taskIds.get(i);
-        if (checkpoints == null) {
-          try {
-            // catch the exception in failed futures
-            futures.get(i).get();
-          }
-          catch (Exception e) {
-            log.error(e, "Problem while getting checkpoints for task [%s], 
killing the task", taskId);
-            killTask(taskId, "Exception[%s] while getting checkpoints", 
e.getClass());
-            taskGroup.tasks.remove(taskId);
-          }
-        } else if (checkpoints.isEmpty()) {
-          log.warn("Ignoring task [%s], as probably it is not started running 
yet", taskId);
-        } else {
-          taskSequences.add(new Pair<>(taskId, checkpoints));
-        }
-      }
-    }
-    catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-
-    final KafkaDataSourceMetadata latestDataSourceMetadata = 
(KafkaDataSourceMetadata) indexerMetadataStorageCoordinator
-        .getDataSourceMetadata(dataSource);
-    final boolean hasValidOffsetsFromDb = latestDataSourceMetadata != null &&
-                                          
latestDataSourceMetadata.getKafkaPartitions() != null &&
-                                          ioConfig.getTopic().equals(
-                                              
latestDataSourceMetadata.getKafkaPartitions().getTopic()
-                                          );
-    final Map<Integer, Long> latestOffsetsFromDb;
-    if (hasValidOffsetsFromDb) {
-      latestOffsetsFromDb = 
latestDataSourceMetadata.getKafkaPartitions().getPartitionOffsetMap();
-    } else {
-      latestOffsetsFromDb = null;
-    }
-
-    // order tasks of this taskGroup by the latest sequenceId
-    taskSequences.sort((o1, o2) -> 
o2.rhs.firstKey().compareTo(o1.rhs.firstKey()));
-
-    final Set<String> tasksToKill = new HashSet<>();
-    final AtomicInteger earliestConsistentSequenceId = new AtomicInteger(-1);
-    int taskIndex = 0;
-
-    while (taskIndex < taskSequences.size()) {
-      TreeMap<Integer, Map<Integer, Long>> taskCheckpoints = 
taskSequences.get(taskIndex).rhs;
-      String taskId = taskSequences.get(taskIndex).lhs;
-      if (earliestConsistentSequenceId.get() == -1) {
-        // find the first replica task with earliest sequenceId consistent 
with datasource metadata in the metadata
-        // store
-        if (taskCheckpoints.entrySet().stream().anyMatch(
-            sequenceCheckpoint -> 
sequenceCheckpoint.getValue().entrySet().stream().allMatch(
-                partitionOffset -> Longs.compare(
-                    partitionOffset.getValue(),
-                    latestOffsetsFromDb == null ?
-                    partitionOffset.getValue() :
-                    latestOffsetsFromDb.getOrDefault(partitionOffset.getKey(), 
partitionOffset.getValue())
-                ) == 0) && earliestConsistentSequenceId.compareAndSet(-1, 
sequenceCheckpoint.getKey())) || (
-                someTaskGroupsPendingCompletion(groupId)
-                && earliestConsistentSequenceId.compareAndSet(-1, 
taskCheckpoints.firstKey()))) {
-          final SortedMap<Integer, Map<Integer, Long>> latestCheckpoints = new 
TreeMap<>(
-              taskCheckpoints.tailMap(earliestConsistentSequenceId.get())
-          );
-          log.info("Setting taskGroup sequences to [%s] for group [%d]", 
latestCheckpoints, groupId);
-          taskGroup.sequenceOffsets.clear();
-          taskGroup.sequenceOffsets.putAll(latestCheckpoints);
-        } else {
-          log.debug(
-              "Adding task [%s] to kill list, checkpoints[%s], latestoffsets 
from DB [%s]",
-              taskId,
-              taskCheckpoints,
-              latestOffsetsFromDb
-          );
-          tasksToKill.add(taskId);
-        }
-      } else {
-        // check consistency with taskGroup sequences
-        if (taskCheckpoints.get(taskGroup.sequenceOffsets.firstKey()) == null
-            || !(taskCheckpoints.get(taskGroup.sequenceOffsets.firstKey())
-                                
.equals(taskGroup.sequenceOffsets.firstEntry().getValue()))
-            || 
taskCheckpoints.tailMap(taskGroup.sequenceOffsets.firstKey()).size()
-               != taskGroup.sequenceOffsets.size()) {
-          log.debug(
-              "Adding task [%s] to kill list, checkpoints[%s], taskgroup 
checkpoints [%s]",
-              taskId,
-              taskCheckpoints,
-              taskGroup.sequenceOffsets
-          );
-          tasksToKill.add(taskId);
-        }
-      }
-      taskIndex++;
-    }
-
-    if ((tasksToKill.size() > 0 && tasksToKill.size() == 
taskGroup.tasks.size()) ||
-        (taskGroup.tasks.size() == 0 && 
!someTaskGroupsPendingCompletion(groupId))) {
-      // killing all tasks or no task left in the group ?
-      // clear state about the taskgroup so that get latest offset information 
is fetched from metadata store
-      log.warn("Clearing task group [%d] information as no valid tasks left 
the group", groupId);
-      taskGroups.remove(groupId);
-      partitionGroups.get(groupId).replaceAll((partition, offset) -> NOT_SET);
-    }
-
-    taskSequences.stream().filter(taskIdSequences -> 
tasksToKill.contains(taskIdSequences.lhs)).forEach(
-        sequenceCheckpoint -> {
-          killTask(
-              sequenceCheckpoint.lhs,
-              "Killing task [%s], as its checkpoints [%s] are not consistent 
with group checkpoints[%s] or latest "
-              + "persisted offsets in metadata store [%s]",
-              sequenceCheckpoint.lhs,
-              sequenceCheckpoint.rhs,
-              taskGroup.sequenceOffsets,
-              latestOffsetsFromDb
-          );
-          taskGroup.tasks.remove(sequenceCheckpoint.lhs);
-        }
-    );
-  }
-
-  private void addDiscoveredTaskToPendingCompletionTaskGroups(
-      int groupId,
-      String taskId,
-      Map<Integer, Long> startingPartitions
-  )
-  {
-    final CopyOnWriteArrayList<TaskGroup> taskGroupList = 
pendingCompletionTaskGroups.computeIfAbsent(
-        groupId,
-        k -> new CopyOnWriteArrayList<>()
-    );
-    for (TaskGroup taskGroup : taskGroupList) {
-      if (taskGroup.partitionOffsets.equals(startingPartitions)) {
-        if (taskGroup.tasks.putIfAbsent(taskId, new TaskData()) == null) {
-          log.info("Added discovered task [%s] to existing pending task group 
[%s]", taskId, groupId);
-        }
-        return;
-      }
-    }
-
-    log.info("Creating new pending completion task group [%s] for discovered 
task [%s]", groupId, taskId);
-
-    // reading the minimumMessageTime & maximumMessageTime from the publishing 
task and setting it here is not necessary as this task cannot
-    // change to a state where it will read any more events
-    TaskGroup newTaskGroup = new TaskGroup(
-        groupId,
-        ImmutableMap.copyOf(startingPartitions),
-        Optional.absent(),
-        Optional.absent()
-    );
-
-    newTaskGroup.tasks.put(taskId, new TaskData());
-    newTaskGroup.completionTimeout = 
DateTimes.nowUtc().plus(ioConfig.getCompletionTimeout());
-
-    taskGroupList.add(newTaskGroup);
-  }
-
-  private void updateTaskStatus() throws ExecutionException, 
InterruptedException, TimeoutException
-  {
-    final List<ListenableFuture<Boolean>> futures = new ArrayList<>();
-    final List<String> futureTaskIds = new ArrayList<>();
-
-    // update status (and startTime if unknown) of current tasks in taskGroups
-    for (TaskGroup group : taskGroups.values()) {
-      for (Entry<String, TaskData> entry : group.tasks.entrySet()) {
-        final String taskId = entry.getKey();
-        final TaskData taskData = entry.getValue();
-
-        if (taskData.startTime == null) {
-          futureTaskIds.add(taskId);
-          futures.add(
-              Futures.transform(
-                  taskClient.getStartTimeAsync(taskId), new Function<DateTime, 
Boolean>()
-                  {
-                    @Nullable
-                    @Override
-                    public Boolean apply(@Nullable DateTime startTime)
-                    {
-                      if (startTime == null) {
-                        return false;
-                      }
-
-                      taskData.startTime = startTime;
-                      long millisRemaining = 
ioConfig.getTaskDuration().getMillis() -
-                                             (System.currentTimeMillis() - 
taskData.startTime.getMillis());
-                      if (millisRemaining > 0) {
-                        scheduledExec.schedule(
-                            buildRunTask(),
-                            millisRemaining + MAX_RUN_FREQUENCY_MILLIS,
-                            TimeUnit.MILLISECONDS
-                        );
-                      }
-
-                      return true;
-                    }
-                  }, workerExec
-              )
-          );
-        }
-
-        taskData.status = taskStorage.getStatus(taskId).get();
-      }
-    }
-
-    // update status of pending completion tasks in pendingCompletionTaskGroups
-    for (List<TaskGroup> taskGroups : pendingCompletionTaskGroups.values()) {
-      for (TaskGroup group : taskGroups) {
-        for (Entry<String, TaskData> entry : group.tasks.entrySet()) {
-          entry.getValue().status = 
taskStorage.getStatus(entry.getKey()).get();
-        }
-      }
-    }
-
-    List<Boolean> results = 
Futures.successfulAsList(futures).get(futureTimeoutInSeconds, TimeUnit.SECONDS);
-    for (int i = 0; i < results.size(); i++) {
-      // false means the task hasn't started running yet and that's okay; null 
means it should be running but the HTTP
-      // request threw an exception so kill the task
-      if (results.get(i) == null) {
-        String taskId = futureTaskIds.get(i);
-        killTask(taskId, "Task [%s] failed to return start time, killing 
task", taskId);
-      }
-    }
-  }
-
-  private void checkTaskDuration() throws InterruptedException, 
ExecutionException, TimeoutException
-  {
-    final List<ListenableFuture<Map<Integer, Long>>> futures = new 
ArrayList<>();
-    final List<Integer> futureGroupIds = new ArrayList<>();
-
-    for (Entry<Integer, TaskGroup> entry : taskGroups.entrySet()) {
-      Integer groupId = entry.getKey();
-      TaskGroup group = entry.getValue();
-
-      // find the longest running task from this group
-      DateTime earliestTaskStart = DateTimes.nowUtc();
-      for (TaskData taskData : group.tasks.values()) {
-        // startTime can be null if kafkaSupervisor is stopped gracefully 
before processing any runNotice
-        if (taskData.startTime != null && 
earliestTaskStart.isAfter(taskData.startTime)) {
-          earliestTaskStart = taskData.startTime;
-        }
-      }
-
-      // if this task has run longer than the configured duration, signal all 
tasks in the group to persist
-      if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow()) {
-        log.info("Task group [%d] has run for [%s]", groupId, 
ioConfig.getTaskDuration());
-        futureGroupIds.add(groupId);
-        futures.add(checkpointTaskGroup(group, true));
-      }
-    }
-
-    List<Map<Integer, Long>> results = 
Futures.successfulAsList(futures).get(futureTimeoutInSeconds, TimeUnit.SECONDS);
-    for (int j = 0; j < results.size(); j++) {
-      Integer groupId = futureGroupIds.get(j);
-      TaskGroup group = taskGroups.get(groupId);
-      Map<Integer, Long> endOffsets = results.get(j);
-
-      if (endOffsets != null) {
-        // set a timeout and put this group in pendingCompletionTaskGroups so 
that it can be monitored for completion
-        group.completionTimeout = 
DateTimes.nowUtc().plus(ioConfig.getCompletionTimeout());
-        pendingCompletionTaskGroups.computeIfAbsent(groupId, k -> new 
CopyOnWriteArrayList<>()).add(group);
-
-        // set endOffsets as the next startOffsets
-        for (Entry<Integer, Long> entry : endOffsets.entrySet()) {
-          partitionGroups.get(groupId).put(entry.getKey(), entry.getValue());
-        }
-      } else {
-        for (String id : group.taskIds()) {
-          killTask(
-              id,
-              "All tasks in group [%s] failed to transition to publishing 
state",
-              groupId
-          );
-        }
-        // clear partitionGroups, so that latest offsets from db is used as 
start offsets not the stale ones
-        // if tasks did some successful incremental handoffs
-        partitionGroups.get(groupId).replaceAll((partition, offset) -> 
NOT_SET);
-      }
-
-      // remove this task group from the list of current task groups now that 
it has been handled
-      taskGroups.remove(groupId);
-    }
-  }
-
-  private ListenableFuture<Map<Integer, Long>> checkpointTaskGroup(final 
TaskGroup taskGroup, final boolean finalize)
-  {
-    if (finalize) {
-      // 1) Check if any task completed (in which case we're done) and kill 
unassigned tasks
-      Iterator<Entry<String, TaskData>> i = 
taskGroup.tasks.entrySet().iterator();
-      while (i.hasNext()) {
-        Entry<String, TaskData> taskEntry = i.next();
-        String taskId = taskEntry.getKey();
-        TaskData task = taskEntry.getValue();
-
-        // task.status can be null if kafkaSupervisor is stopped gracefully 
before processing any runNotice.
-        if (task.status != null) {
-          if (task.status.isSuccess()) {
-            // If any task in this group has already completed, stop the rest 
of the tasks in the group and return.
-            // This will cause us to create a new set of tasks next cycle that 
will start from the offsets in
-            // metadata store (which will have advanced if we succeeded in 
publishing and will remain the same if
-            // publishing failed and we need to re-ingest)
-            return Futures.transform(
-                stopTasksInGroup(taskGroup, "task[%s] succeeded in the 
taskGroup", task.status.getId()),
-                new Function<Object, Map<Integer, Long>>()
-                {
-                  @Nullable
-                  @Override
-                  public Map<Integer, Long> apply(@Nullable Object input)
-                  {
-                    return null;
-                  }
-                }
-            );
-          }
-
-          if (task.status.isRunnable()) {
-            if 
(taskInfoProvider.getTaskLocation(taskId).equals(TaskLocation.unknown())) {
-              killTask(taskId, "Killing task [%s] which hasn't been assigned 
to a worker", taskId);
-              i.remove();
-            }
-          }
-        }
-      }
-    }
-
-    // 2) Pause running tasks
-    final List<ListenableFuture<Map<Integer, Long>>> pauseFutures = new 
ArrayList<>();
-    final List<String> pauseTaskIds = 
ImmutableList.copyOf(taskGroup.taskIds());
-    for (final String taskId : pauseTaskIds) {
-      pauseFutures.add(taskClient.pauseAsync(taskId));
-    }
-
-    return Futures.transform(
-        Futures.successfulAsList(pauseFutures), new Function<List<Map<Integer, 
Long>>, Map<Integer, Long>>()
-        {
-          @Nullab
 
 Review comment:
   checkstyle complains
   ```
    According to the Druid code style, if a method or constructor declaration 
or a call
    doesn't fit a single line, each parameter or argument should be on it's 
own, e. g:
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to