jon-wei commented on a change in pull request #7212: Support Kafka supervisor 
adopting running tasks between versions 
URL: https://github.com/apache/incubator-druid/pull/7212#discussion_r272769658
 
 

 ##########
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
 ##########
 @@ -1630,30 +1700,48 @@ public Void apply(@Nullable Boolean result)
     );
   }
 
-  private boolean isTaskCurrent(int taskGroupId, String taskId)
+  @VisibleForTesting
+  public boolean isTaskCurrent(int taskGroupId, String taskId)
   {
     Optional<Task> taskOptional = taskStorage.getTask(taskId);
     if (!taskOptional.isPresent() || 
!doesTaskTypeMatchSupervisor(taskOptional.get())) {
       return false;
     }
 
     @SuppressWarnings("unchecked")
-    SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType> task = 
(SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType>) taskOptional
-        .get();
+    SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType> task =
+        (SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType>) 
taskOptional.get();
+
+    // We recompute the sequence name hash for the supervisor's own 
configuration and compare this to the hash created
+    // by rehashing the task's sequence name using the most up-to-date class 
definitions of tuning config and
+    // data schema. Recomputing both ensures that forwards-compatible tasks 
won't be killed (which would occur
+    // if the hash generated using the old class definitions was used).
+    String taskSequenceName = generateSequenceName(
+        
task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap(),
+        task.getIOConfig().getMinimumMessageTime(),
+        task.getIOConfig().getMaximumMessageTime(),
+        task.getDataSchema(),
+        task.getTuningConfig()
+    );
 
-    String taskSequenceName = task.getIOConfig().getBaseSequenceName();
     if (activelyReadingTaskGroups.get(taskGroupId) != null) {
-      return Preconditions
-          .checkNotNull(activelyReadingTaskGroups.get(taskGroupId), "null 
taskGroup for taskId[%s]", taskGroupId)
-          .baseSequenceName
-          .equals(taskSequenceName);
+      TaskGroup taskGroup = activelyReadingTaskGroups.get(taskGroupId);
 
 Review comment:
   oh nm, I see it was redundant

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to