leventov commented on a change in pull request #4815: Kafka Index Task that 
supports Incremental handoffs
URL: https://github.com/apache/incubator-druid/pull/4815#discussion_r226462114
 
 

 ##########
 File path: 
extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
 ##########
 @@ -946,6 +1107,150 @@ public Boolean apply(KafkaIndexTask.Status status)
       }
     }
     log.debug("Found [%d] Kafka indexing tasks for dataSource [%s]", 
taskCount, dataSource);
+
+    // make sure the checkpoints are consistent with each other and with the 
metadata store
+    taskGroupsToVerify.forEach(this::verifyAndMergeCheckpoints);
+  }
+
+  /**
+   * This method does two things -
+   * 1. Makes sure the checkpoints information in the taskGroup is consistent 
with that of the tasks, if not kill
+   * inconsistent tasks.
+   * 2. truncates the checkpoints in the taskGroup corresponding to which 
segments have been published, so that any newly
+   * created tasks for the taskGroup start indexing from after the latest 
published offsets.
+   */
+  private void verifyAndMergeCheckpoints(final Integer groupId)
+  {
+    final TaskGroup taskGroup = taskGroups.get(groupId);
+
+    // List<TaskId, Map -> {SequenceId, Checkpoints}>
+    final List<Pair<String, TreeMap<Integer, Map<Integer, Long>>>> 
taskSequences = new CopyOnWriteArrayList<>();
+    final List<ListenableFuture<TreeMap<Integer, Map<Integer, Long>>>> futures 
= new ArrayList<>();
+
+    for (String taskId : taskGroup.taskIds()) {
+      final ListenableFuture<TreeMap<Integer, Map<Integer, Long>>> 
checkpointsFuture = taskClient.getCheckpointsAsync(
+          taskId,
+          true
+      );
+      futures.add(checkpointsFuture);
+      Futures.addCallback(
+          checkpointsFuture,
+          new FutureCallback<TreeMap<Integer, Map<Integer, Long>>>()
+          {
+            @Override
+            public void onSuccess(TreeMap<Integer, Map<Integer, Long>> 
checkpoints)
+            {
+              if (!checkpoints.isEmpty()) {
+                taskSequences.add(new Pair<>(taskId, checkpoints));
+              } else {
+                log.warn("Ignoring task [%s], as probably it is not started 
running yet", taskId);
+              }
+            }
+
+            @Override
+            public void onFailure(Throwable t)
+            {
+              log.error(t, "Problem while getting checkpoints for task [%s], 
killing the task", taskId);
+              killTask(taskId);
+              taskGroup.tasks.remove(taskId);
+            }
+          }
+      );
+    }
+
+    try {
+      Futures.allAsList(futures).get(futureTimeoutInSeconds, TimeUnit.SECONDS);
+    }
+    catch (Exception e) {
+      Throwables.propagate(e);
+    }
+
+    final KafkaDataSourceMetadata latestDataSourceMetadata = 
(KafkaDataSourceMetadata) indexerMetadataStorageCoordinator
+        .getDataSourceMetadata(dataSource);
+    final Map<Integer, Long> latestOffsetsFromDb = (latestDataSourceMetadata 
== null
+                                                    || 
latestDataSourceMetadata.getKafkaPartitions() == null) ? null
+                                                                               
                               : latestDataSourceMetadata
+                                                       .getKafkaPartitions()
+                                                       
.getPartitionOffsetMap();
+
+    // order tasks of this taskGroup by the latest sequenceId
+    taskSequences.sort((o1, o2) -> 
o2.rhs.firstKey().compareTo(o1.rhs.firstKey()));
+
+    final Set<String> tasksToKill = new HashSet<>();
+    final AtomicInteger earliestConsistentSequenceId = new AtomicInteger(-1);
+    int taskIndex = 0;
+
+    while (taskIndex < taskSequences.size()) {
+      if (earliestConsistentSequenceId.get() == -1) {
+        // find the first replica task with earliest sequenceId consistent 
with datasource metadata in the metadata store
+        if (taskSequences.get(taskIndex).rhs.entrySet().stream().anyMatch(
 
 Review comment:
   @pjain1 could you please refactor this if statement with side effects into a 
method that returns a boolean? It's very hard to understand this code.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to