jihoonson closed pull request #6512: Fix NPE in TaskLockbox that prevents 
overlord leadership
URL: https://github.com/apache/incubator-druid/pull/6512
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java
index 2c836bd2066..808fdb79729 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java
@@ -190,9 +190,11 @@ public void setStatus(final TaskStatus status)
   @Override
   public List<Task> getActiveTasks()
   {
+    // filter out taskInfo with a null 'task' which should only happen in 
practice if we are missing a jackson module
+    // and don't know what to do with the payload, so we won't be able to make 
use of it anyway
     return handler.getActiveTaskInfo(null)
            .stream()
-           .filter(taskInfo -> taskInfo.getStatus().isRunnable())
+           .filter(taskInfo -> taskInfo.getStatus().isRunnable() && 
taskInfo.getTask() != null)
            .map(TaskInfo::getTask)
            .collect(Collectors.toList());
   }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
index e20df5a7a7c..790e00f0846 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
@@ -23,11 +23,16 @@
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.fasterxml.jackson.databind.module.SimpleModule;
 import com.google.common.collect.Iterables;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.TaskLock;
 import org.apache.druid.indexing.common.TaskLockType;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.config.TaskStorageConfig;
+import org.apache.druid.indexing.common.task.AbstractTask;
 import org.apache.druid.indexing.common.task.NoopTask;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.jackson.DefaultObjectMapper;
@@ -342,6 +347,46 @@ public void testSyncFromStorageWithInvalidPriority() 
throws EntryExistsException
     lockbox.syncFromStorage();
   }
 
+  @Test
+  public void testSyncWithUnknownTaskTypesFromModuleNotLoaded() throws 
Exception
+  {
+    // ensure that if we don't know how to deserialize a task it won't explode 
the lockbox
+    // (or anything else that uses taskStorage.getActiveTasks() and doesn't 
expect null which is most things)
+    final TestDerbyConnector derbyConnector = derby.getConnector();
+    ObjectMapper loadedMapper = new DefaultObjectMapper().registerModule(new 
TheModule());
+    TaskStorage loadedTaskStorage = new MetadataTaskStorage(
+        derbyConnector,
+        new TaskStorageConfig(null),
+        new DerbyMetadataStorageActionHandlerFactory(
+            derbyConnector,
+            derby.metadataTablesConfigSupplier().get(),
+            loadedMapper
+        )
+    );
+
+    TaskLockbox theBox = new TaskLockbox(taskStorage);
+    TaskLockbox loadedBox = new TaskLockbox(loadedTaskStorage);
+
+    Task aTask = NoopTask.create();
+    taskStorage.insert(aTask, TaskStatus.running(aTask.getId()));
+    theBox.add(aTask);
+    loadedBox.add(aTask);
+
+    Task theTask = new MyModuleIsntLoadedTask("1", "yey", null, "foo");
+    loadedTaskStorage.insert(theTask, TaskStatus.running(theTask.getId()));
+    theBox.add(theTask);
+    loadedBox.add(theTask);
+
+    List<Task> tasks = taskStorage.getActiveTasks();
+    List<Task> tasksFromLoaded = loadedTaskStorage.getActiveTasks();
+
+    theBox.syncFromStorage();
+    loadedBox.syncFromStorage();
+
+    Assert.assertEquals(1, tasks.size());
+    Assert.assertEquals(2, tasksFromLoaded.size());
+  }
+
   @Test
   public void testRevokedLockSyncFromStorage() throws EntryExistsException
   {
@@ -648,4 +693,55 @@ public boolean isRevoked()
       return super.isRevoked();
     }
   }
+
+  private static String TASK_NAME = "myModuleIsntLoadedTask";
+  private static class TheModule extends SimpleModule
+  {
+    public TheModule()
+    {
+
+      registerSubtypes(new NamedType(MyModuleIsntLoadedTask.class, TASK_NAME));
+    }
+  }
+
+  private static class MyModuleIsntLoadedTask extends AbstractTask
+  {
+    private String someProp;
+
+    @JsonCreator
+    protected MyModuleIsntLoadedTask(
+        @JsonProperty("id") String id,
+        @JsonProperty("dataSource") String dataSource,
+        @JsonProperty("context") Map<String, Object> context,
+        @JsonProperty("someProp") String someProp
+    )
+    {
+      super(id, dataSource, context);
+      this.someProp = someProp;
+    }
+
+    @JsonProperty
+    public String getSomeProp()
+    {
+      return someProp;
+    }
+
+    @Override
+    public String getType()
+    {
+      return TASK_NAME;
+    }
+
+    @Override
+    public boolean isReady(TaskActionClient taskActionClient)
+    {
+      return true;
+    }
+
+    @Override
+    public TaskStatus run(TaskToolbox toolbox)
+    {
+      return TaskStatus.failure("how?");
+    }
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to