gianm closed pull request #5972: [Backport] Prevent KafkaSupervisor NPE in 
generateSequenceName 
URL: https://github.com/apache/incubator-druid/pull/5972
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
 
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index 7cc79f6a98a..5eb783c43c8 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -143,7 +143,8 @@
    * time, there should only be up to a maximum of [taskCount] 
actively-reading task groups (tracked in the [taskGroups]
    * map) + zero or more pending-completion task groups (tracked in 
[pendingCompletionTaskGroups]).
    */
-  private static class TaskGroup
+  @VisibleForTesting
+  static class TaskGroup
   {
     // This specifies the partitions and starting offsets for this task group. 
It is set on group creation from the data
     // in [partitionGroups] and never changes during the lifetime of this task 
group, which will live until a task in
@@ -758,8 +759,8 @@ void resetInternal(DataSourceMetadata dataSourceMetadata)
           
resetKafkaMetadata.getKafkaPartitions().getPartitionOffsetMap().keySet().forEach(partition
 -> {
             final int groupId = getTaskGroupIdForPartition(partition);
             killTaskGroupForPartitions(ImmutableSet.of(partition));
-            sequenceTaskGroup.remove(generateSequenceName(groupId));
-            taskGroups.remove(groupId);
+            final TaskGroup removedGroup = taskGroups.remove(groupId);
+            sequenceTaskGroup.remove(generateSequenceName(removedGroup));
             partitionGroups.get(groupId).replaceAll((partitionId, offset) -> 
NOT_SET);
           });
         } else {
@@ -867,12 +868,13 @@ String generateSequenceName(
   }
 
   @VisibleForTesting
-  String generateSequenceName(int groupId)
+  String generateSequenceName(TaskGroup taskGroup)
   {
+    Preconditions.checkNotNull(taskGroup, "taskGroup cannot be null");
     return generateSequenceName(
-        taskGroups.get(groupId).partitionOffsets,
-        taskGroups.get(groupId).minimumMessageTime,
-        taskGroups.get(groupId).maximumMessageTime
+        taskGroup.partitionOffsets,
+        taskGroup.minimumMessageTime,
+        taskGroup.maximumMessageTime
     );
   }
 
@@ -1066,18 +1068,19 @@ public Boolean apply(KafkaIndexTask.Status status)
                             }
                             return false;
                           } else {
+                            final TaskGroup taskGroup = new TaskGroup(
+                                ImmutableMap.copyOf(
+                                    kafkaTask.getIOConfig()
+                                             .getStartPartitions()
+                                             .getPartitionOffsetMap()
+                                ), 
kafkaTask.getIOConfig().getMinimumMessageTime(),
+                                kafkaTask.getIOConfig().getMaximumMessageTime()
+                            );
                             if (taskGroups.putIfAbsent(
                                 taskGroupId,
-                                new TaskGroup(
-                                    ImmutableMap.copyOf(
-                                        kafkaTask.getIOConfig()
-                                                 .getStartPartitions()
-                                                 .getPartitionOffsetMap()
-                                    ), 
kafkaTask.getIOConfig().getMinimumMessageTime(),
-                                    
kafkaTask.getIOConfig().getMaximumMessageTime()
-                                )
+                                taskGroup
                             ) == null) {
-                              
sequenceTaskGroup.put(generateSequenceName(taskGroupId), 
taskGroups.get(taskGroupId));
+                              
sequenceTaskGroup.put(generateSequenceName(taskGroup), 
taskGroups.get(taskGroupId));
                               log.info("Created new task group [%d]", 
taskGroupId);
                             }
                             taskGroupsToVerify.add(taskGroupId);
@@ -1234,7 +1237,7 @@ public void onFailure(Throwable t)
       // killing all tasks or no task left in the group ?
       // clear state about the taskgroup so that get latest offset information 
is fetched from metadata store
       log.warn("Clearing task group [%d] information as no valid tasks left 
the group", groupId);
-      sequenceTaskGroup.remove(generateSequenceName(groupId));
+      sequenceTaskGroup.remove(generateSequenceName(taskGroup));
       taskGroups.remove(groupId);
       partitionGroups.get(groupId).replaceAll((partition, offset) -> NOT_SET);
     }
@@ -1410,7 +1413,7 @@ private void checkTaskDuration() throws 
InterruptedException, ExecutionException
         partitionGroups.get(groupId).replaceAll((partition, offset) -> 
NOT_SET);
       }
 
-      sequenceTaskGroup.remove(generateSequenceName(groupId));
+      sequenceTaskGroup.remove(generateSequenceName(group));
       // remove this task group from the list of current task groups now that 
it has been handled
       taskGroups.remove(groupId);
     }
@@ -1612,8 +1615,7 @@ private void checkPendingCompletionTasks() throws 
ExecutionException, Interrupte
 
           // reset partitions offsets for this task group so that they will be 
re-read from metadata storage
           partitionGroups.get(groupId).replaceAll((partition, offset) -> 
NOT_SET);
-          sequenceTaskGroup.remove(generateSequenceName(groupId));
-
+          sequenceTaskGroup.remove(generateSequenceName(group));
           // kill all the tasks in this pending completion group
           killTasksInGroup(group);
           // set a flag so the other pending completion groups for this set of 
partitions will also stop
@@ -1673,7 +1675,7 @@ private void checkCurrentTaskState() throws 
ExecutionException, InterruptedExcep
         // be recreated with the next set of offsets
         if (taskData.status.isSuccess()) {
           futures.add(stopTasksInGroup(taskGroup));
-          sequenceTaskGroup.remove(generateSequenceName(groupId));
+          sequenceTaskGroup.remove(generateSequenceName(taskGroup));
           iTaskGroups.remove();
           break;
         }
@@ -1706,15 +1708,16 @@ void createNewTasks() throws JsonProcessingException
             
DateTimes.nowUtc().plus(ioConfig.getTaskDuration()).plus(ioConfig.getEarlyMessageRejectionPeriod().get())
         ) : Optional.<DateTime>absent());
 
+        final TaskGroup taskGroup = new TaskGroup(
+            generateStartingOffsetsForPartitionGroup(groupId),
+            minimumMessageTime,
+            maximumMessageTime
+        );
         taskGroups.put(
             groupId,
-            new TaskGroup(
-                generateStartingOffsetsForPartitionGroup(groupId),
-                minimumMessageTime,
-                maximumMessageTime
-            )
+            taskGroup
         );
-        sequenceTaskGroup.put(generateSequenceName(groupId), 
taskGroups.get(groupId));
+        sequenceTaskGroup.put(generateSequenceName(taskGroup), 
taskGroups.get(groupId));
       }
     }
 
@@ -1749,8 +1752,8 @@ private void createKafkaTasksForGroup(int groupId, int 
replicas) throws JsonProc
     for (Integer partition : startPartitions.keySet()) {
       endPartitions.put(partition, Long.MAX_VALUE);
     }
-
-    String sequenceName = generateSequenceName(groupId);
+    TaskGroup group = taskGroups.get(groupId);
+    String sequenceName = generateSequenceName(group);
 
     Map<String, String> consumerProperties = 
Maps.newHashMap(ioConfig.getConsumerProperties());
     DateTime minimumMessageTime = 
taskGroups.get(groupId).minimumMessageTime.orNull();
@@ -1911,7 +1914,7 @@ private boolean isTaskCurrent(int taskGroupId, String 
taskId)
 
     String taskSequenceName = ((KafkaIndexTask) 
taskOptional.get()).getIOConfig().getBaseSequenceName();
     if (taskGroups.get(taskGroupId) != null) {
-      return generateSequenceName(taskGroupId).equals(taskSequenceName);
+      return 
generateSequenceName(taskGroups.get(taskGroupId)).equals(taskSequenceName);
     } else {
       return generateSequenceName(
           ((KafkaIndexTask) taskOptional.get()).getIOConfig()
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 29041e27de5..3c2b1989571 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -2087,12 +2087,6 @@ public TestableKafkaSupervisor(
       super(taskStorage, taskMaster, indexerMetadataStorageCoordinator, 
taskClientFactory, mapper, spec);
     }
 
-    @Override
-    protected String generateSequenceName(int groupId)
-    {
-      return StringUtils.format("sequenceName-%d", groupId);
-    }
-
     @Override
     protected String generateSequenceName(
         Map<Integer, Long> startPartitions,
@@ -2100,7 +2094,8 @@ protected String generateSequenceName(
         Optional<DateTime> maximumMessageTime
     )
     {
-      return 
generateSequenceName(getTaskGroupIdForPartition(startPartitions.keySet().iterator().next()));
+      final int groupId = 
getTaskGroupIdForPartition(startPartitions.keySet().iterator().next());
+      return StringUtils.format("sequenceName-%d", groupId);
     }
   }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org
For additional commands, e-mail: dev-h...@druid.apache.org

Reply via email to