jihoonson closed pull request #6512: Fix NPE in TaskLockbox that prevents
overlord leadership
URL: https://github.com/apache/incubator-druid/pull/6512
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java
index 2c836bd2066..808fdb79729 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java
@@ -190,9 +190,11 @@ public void setStatus(final TaskStatus status)
@Override
public List<Task> getActiveTasks()
{
+ // filter out taskInfo with a null 'task' which should only happen in
practice if we are missing a jackson module
+ // and don't know what to do with the payload, so we won't be able to make
use of it anyway
return handler.getActiveTaskInfo(null)
.stream()
- .filter(taskInfo -> taskInfo.getStatus().isRunnable())
+ .filter(taskInfo -> taskInfo.getStatus().isRunnable() &&
taskInfo.getTask() != null)
.map(TaskInfo::getTask)
.collect(Collectors.toList());
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
index e20df5a7a7c..790e00f0846 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
@@ -23,11 +23,16 @@
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.Iterables;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
+import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.jackson.DefaultObjectMapper;
@@ -342,6 +347,46 @@ public void testSyncFromStorageWithInvalidPriority()
throws EntryExistsException
lockbox.syncFromStorage();
}
+ @Test
+ public void testSyncWithUnknownTaskTypesFromModuleNotLoaded() throws
Exception
+ {
+ // ensure that if we don't know how to deserialize a task it won't explode
the lockbox
+ // (or anything else that uses taskStorage.getActiveTasks() and doesn't
expect null which is most things)
+ final TestDerbyConnector derbyConnector = derby.getConnector();
+ ObjectMapper loadedMapper = new DefaultObjectMapper().registerModule(new
TheModule());
+ TaskStorage loadedTaskStorage = new MetadataTaskStorage(
+ derbyConnector,
+ new TaskStorageConfig(null),
+ new DerbyMetadataStorageActionHandlerFactory(
+ derbyConnector,
+ derby.metadataTablesConfigSupplier().get(),
+ loadedMapper
+ )
+ );
+
+ TaskLockbox theBox = new TaskLockbox(taskStorage);
+ TaskLockbox loadedBox = new TaskLockbox(loadedTaskStorage);
+
+ Task aTask = NoopTask.create();
+ taskStorage.insert(aTask, TaskStatus.running(aTask.getId()));
+ theBox.add(aTask);
+ loadedBox.add(aTask);
+
+ Task theTask = new MyModuleIsntLoadedTask("1", "yey", null, "foo");
+ loadedTaskStorage.insert(theTask, TaskStatus.running(theTask.getId()));
+ theBox.add(theTask);
+ loadedBox.add(theTask);
+
+ List<Task> tasks = taskStorage.getActiveTasks();
+ List<Task> tasksFromLoaded = loadedTaskStorage.getActiveTasks();
+
+ theBox.syncFromStorage();
+ loadedBox.syncFromStorage();
+
+ Assert.assertEquals(1, tasks.size());
+ Assert.assertEquals(2, tasksFromLoaded.size());
+ }
+
@Test
public void testRevokedLockSyncFromStorage() throws EntryExistsException
{
@@ -648,4 +693,55 @@ public boolean isRevoked()
return super.isRevoked();
}
}
+
+ private static String TASK_NAME = "myModuleIsntLoadedTask";
+ private static class TheModule extends SimpleModule
+ {
+ public TheModule()
+ {
+
+ registerSubtypes(new NamedType(MyModuleIsntLoadedTask.class, TASK_NAME));
+ }
+ }
+
+ private static class MyModuleIsntLoadedTask extends AbstractTask
+ {
+ private String someProp;
+
+ @JsonCreator
+ protected MyModuleIsntLoadedTask(
+ @JsonProperty("id") String id,
+ @JsonProperty("dataSource") String dataSource,
+ @JsonProperty("context") Map<String, Object> context,
+ @JsonProperty("someProp") String someProp
+ )
+ {
+ super(id, dataSource, context);
+ this.someProp = someProp;
+ }
+
+ @JsonProperty
+ public String getSomeProp()
+ {
+ return someProp;
+ }
+
+ @Override
+ public String getType()
+ {
+ return TASK_NAME;
+ }
+
+ @Override
+ public boolean isReady(TaskActionClient taskActionClient)
+ {
+ return true;
+ }
+
+ @Override
+ public TaskStatus run(TaskToolbox toolbox)
+ {
+ return TaskStatus.failure("how?");
+ }
+ }
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]