jihoonson closed pull request #6106: [Backport] Fix IllegalArgumentException in
TaskLockBox.syncFromStorage() when updating from 0.12.x to 0.12.2
URL: https://github.com/apache/incubator-druid/pull/6106
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
index 76468b67721..5f039c0336b 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
@@ -299,9 +299,9 @@ private static String makeTaskId(String dataSource, int
randomBits)
}
@Override
- public int getDefaultPriority()
+ public int getPriority()
{
- return Tasks.DEFAULT_REALTIME_TASK_PRIORITY;
+ return getContextValue(Tasks.PRIORITY_KEY,
Tasks.DEFAULT_REALTIME_TASK_PRIORITY);
}
@Override
diff --git
a/indexing-service/src/main/java/io/druid/indexing/common/TaskLock.java
b/indexing-service/src/main/java/io/druid/indexing/common/TaskLock.java
index 5c7ca14b30d..273335ed654 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/TaskLock.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskLock.java
@@ -37,9 +37,22 @@
private final String dataSource;
private final Interval interval;
private final String version;
- private final int priority;
+ private final Integer priority;
private final boolean revoked;
+ public static TaskLock withPriority(TaskLock lock, int priority)
+ {
+ return new TaskLock(
+ lock.type,
+ lock.getGroupId(),
+ lock.getDataSource(),
+ lock.getInterval(),
+ lock.getVersion(),
+ priority,
+ lock.isRevoked()
+ );
+ }
+
@JsonCreator
public TaskLock(
@JsonProperty("type") @Nullable TaskLockType type, //
nullable for backward compatibility
@@ -47,7 +60,7 @@ public TaskLock(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval,
@JsonProperty("version") String version,
- @JsonProperty("priority") int priority,
+ @JsonProperty("priority") @Nullable Integer priority,
@JsonProperty("revoked") boolean revoked
)
{
@@ -116,11 +129,17 @@ public String getVersion()
}
@JsonProperty
- public int getPriority()
+ @Nullable
+ public Integer getPriority()
{
return priority;
}
+ public int getNonNullPriority()
+ {
+ return Preconditions.checkNotNull(priority, "priority");
+ }
+
@JsonProperty
public boolean isRevoked()
{
@@ -139,7 +158,7 @@ public boolean equals(Object o)
this.dataSource.equals(that.dataSource) &&
this.interval.equals(that.interval) &&
this.version.equals(that.version) &&
- this.priority == that.priority &&
+ Objects.equal(this.priority, that.priority) &&
this.revoked == that.revoked;
}
}
diff --git
a/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java
b/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java
index ad776011043..5e94285cd9f 100644
---
a/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java
+++
b/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java
@@ -160,9 +160,9 @@ public String getType()
}
@Override
- public int getDefaultPriority()
+ public int getPriority()
{
- return Tasks.DEFAULT_MERGE_TASK_PRIORITY;
+ return getContextValue(Tasks.PRIORITY_KEY,
Tasks.DEFAULT_MERGE_TASK_PRIORITY);
}
@VisibleForTesting
diff --git
a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
index 6a08a82811b..4386b5b3595 100644
---
a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
+++
b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
@@ -121,9 +121,9 @@ public HadoopIndexTask(
}
@Override
- public int getDefaultPriority()
+ public int getPriority()
{
- return Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY;
+ return getContextValue(Tasks.PRIORITY_KEY,
Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY);
}
@Override
diff --git
a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java
b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java
index 6cbc27811d9..b9ec2705510 100644
---
a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java
+++
b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java
@@ -170,9 +170,9 @@ public IndexTask(
}
@Override
- public int getDefaultPriority()
+ public int getPriority()
{
- return Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY;
+ return getContextValue(Tasks.PRIORITY_KEY,
Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY);
}
@Override
diff --git
a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java
b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java
index 3a9fc979394..acf1be019ca 100644
---
a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java
+++
b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java
@@ -133,9 +133,9 @@ public boolean apply(@Nullable DataSegment segment)
}
@Override
- public int getDefaultPriority()
+ public int getPriority()
{
- return Tasks.DEFAULT_MERGE_TASK_PRIORITY;
+ return getContextValue(Tasks.PRIORITY_KEY,
Tasks.DEFAULT_MERGE_TASK_PRIORITY);
}
@Override
diff --git
a/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java
b/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java
index 6c6358df4be..e2027747bf5 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java
@@ -152,9 +152,9 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception
}
@Override
- public int getDefaultPriority()
+ public int getPriority()
{
- return Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY;
+ return getContextValue(Tasks.PRIORITY_KEY,
Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY);
}
public static NoopTask create()
diff --git
a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
index 053df113ec9..b3a12f79c87 100644
---
a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
+++
b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
@@ -162,9 +162,9 @@ public RealtimeIndexTask(
}
@Override
- public int getDefaultPriority()
+ public int getPriority()
{
- return Tasks.DEFAULT_REALTIME_TASK_PRIORITY;
+ return getContextValue(Tasks.PRIORITY_KEY,
Tasks.DEFAULT_REALTIME_TASK_PRIORITY);
}
@Override
diff --git
a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java
b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java
index c69b6b56e25..1c7a134eb84 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java
@@ -97,14 +97,6 @@ default int getPriority()
return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_TASK_PRIORITY);
}
- /**
- * Returns the default task priority. It can vary depending on the task type.
- */
- default int getDefaultPriority()
- {
- return Tasks.DEFAULT_TASK_PRIORITY;
- }
-
/**
* Returns a {@link TaskResource} for this task. Task resources define
specific worker requirements a task may
* require.
diff --git
a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java
b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java
index 286296d1c4c..8488257a7ee 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java
@@ -130,17 +130,23 @@ public int compare(Pair<Task, TaskLock> left, Pair<Task,
TaskLock> right)
final TaskLock savedTaskLock = taskAndLock.rhs;
if (savedTaskLock.getInterval().toDurationMillis() <= 0) {
// "Impossible", but you never know what crazy stuff can be restored
from storage.
- log.warn("WTF?! Got lock with empty interval for task: %s",
task.getId());
+ log.warn("WTF?! Got lock[%s] with empty interval for task: %s",
savedTaskLock, task.getId());
continue;
}
- final TaskLockPosse taskLockPosse = createOrFindLockPosse(task,
savedTaskLock);
+ // Create a new taskLock if it doesn't have a proper priority,
+ // so that every taskLock in memory has the priority.
+ final TaskLock savedTaskLockWithPriority = savedTaskLock.getPriority()
== null
+ ? TaskLock.withPriority(savedTaskLock,
task.getPriority())
+ : savedTaskLock;
+
+ final TaskLockPosse taskLockPosse = createOrFindLockPosse(task,
savedTaskLockWithPriority);
if (taskLockPosse != null) {
taskLockPosse.addTask(task);
final TaskLock taskLock = taskLockPosse.getTaskLock();
- if (savedTaskLock.getVersion().equals(taskLock.getVersion())) {
+ if
(savedTaskLockWithPriority.getVersion().equals(taskLock.getVersion())) {
taskLockCount++;
log.info(
"Reacquired lock[%s] for task: %s",
@@ -151,8 +157,8 @@ public int compare(Pair<Task, TaskLock> left, Pair<Task,
TaskLock> right)
taskLockCount++;
log.info(
"Could not reacquire lock on interval[%s] version[%s] (got
version[%s] instead) for task: %s",
- savedTaskLock.getInterval(),
- savedTaskLock.getVersion(),
+ savedTaskLockWithPriority.getInterval(),
+ savedTaskLockWithPriority.getVersion(),
taskLock.getVersion(),
task.getId()
);
@@ -160,8 +166,8 @@ public int compare(Pair<Task, TaskLock> left, Pair<Task,
TaskLock> right)
} else {
throw new ISE(
"Could not reacquire lock on interval[%s] version[%s] for task:
%s",
- savedTaskLock.getInterval(),
- savedTaskLock.getVersion(),
+ savedTaskLockWithPriority.getInterval(),
+ savedTaskLockWithPriority.getVersion(),
task.getId()
);
}
@@ -382,11 +388,14 @@ private TaskLockPosse createOrFindLockPosse(Task task,
TaskLock taskLock)
taskLock.getDataSource(),
task.getDataSource()
);
+ final int taskPriority = task.getPriority();
+ final int lockPriority = taskLock.getNonNullPriority();
+
Preconditions.checkArgument(
- task.getPriority() == taskLock.getPriority(),
+ lockPriority == taskPriority,
"lock priority[%s] is different from task priority[%s]",
- taskLock.getPriority(),
- task.getPriority()
+ lockPriority,
+ taskPriority
);
return createOrFindLockPosse(
@@ -396,7 +405,7 @@ private TaskLockPosse createOrFindLockPosse(Task task,
TaskLock taskLock)
taskLock.getDataSource(),
taskLock.getInterval(),
taskLock.getVersion(),
- taskLock.getPriority(),
+ taskPriority,
taskLock.isRevoked()
);
}
@@ -925,7 +934,7 @@ private static boolean isAllRevocable(List<TaskLockPosse>
lockPosses, int tryLoc
private static boolean isRevocable(TaskLockPosse lockPosse, int
tryLockPriority)
{
final TaskLock existingLock = lockPosse.getTaskLock();
- return existingLock.isRevoked() || existingLock.getPriority() <
tryLockPriority;
+ return existingLock.isRevoked() || existingLock.getNonNullPriority() <
tryLockPriority;
}
private TaskLockPosse getOnlyTaskLockPosseContainingInterval(Task task,
Interval interval)
@@ -986,7 +995,7 @@ TaskLock getTaskLock()
boolean addTask(Task task)
{
Preconditions.checkArgument(taskLock.getGroupId().equals(task.getGroupId()));
- Preconditions.checkArgument(taskLock.getPriority() ==
task.getPriority());
+ Preconditions.checkArgument(taskLock.getNonNullPriority() ==
task.getPriority());
return taskIds.add(task.getId());
}
diff --git
a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java
b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java
index aeb31418b4c..8e4be31b869 100644
---
a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java
+++
b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java
@@ -41,7 +41,6 @@
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.actions.TaskActionHolder;
import io.druid.indexing.common.task.Task;
-import io.druid.indexing.common.task.Tasks;
import io.druid.indexing.overlord.IndexerMetadataStorageAdapter;
import io.druid.indexing.overlord.TaskMaster;
import io.druid.indexing.overlord.TaskQueue;
@@ -165,11 +164,6 @@ public Response taskPost(
public Response apply(TaskQueue taskQueue)
{
try {
- // Set default priority if needed
- final Integer priority =
task.getContextValue(Tasks.PRIORITY_KEY);
- if (priority == null) {
- task.addToContext(Tasks.PRIORITY_KEY,
task.getDefaultPriority());
- }
taskQueue.add(task);
return Response.ok(ImmutableMap.of("task",
task.getId())).build();
}
diff --git
a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java
b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java
index 0aa90d6401e..ed16b9be6f5 100644
---
a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java
+++
b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java
@@ -19,6 +19,9 @@
package io.druid.indexing.overlord;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Iterables;
import io.druid.indexing.common.TaskLock;
@@ -261,6 +264,84 @@ public void testSyncFromStorage() throws
EntryExistsException
Assert.assertEquals(beforeLocksInStorage, afterLocksInStorage);
}
+ @Test
+ public void testSyncFromStorageWithMissingTaskLockPriority() throws
EntryExistsException
+ {
+ final Task task = NoopTask.create();
+ taskStorage.insert(task, TaskStatus.running(task.getId()));
+ taskStorage.addLock(
+ task.getId(),
+ new TaskLockWithoutPriority(task.getGroupId(), task.getDataSource(),
Intervals.of("2017/2018"), "v1")
+ );
+
+ final List<TaskLock> beforeLocksInStorage =
taskStorage.getActiveTasks().stream()
+ .flatMap(t ->
taskStorage.getLocks(t.getId()).stream())
+
.collect(Collectors.toList());
+
+ final TaskLockbox lockbox = new TaskLockbox(taskStorage);
+ lockbox.syncFromStorage();
+
+ final List<TaskLock> afterLocksInStorage =
taskStorage.getActiveTasks().stream()
+ .flatMap(t ->
taskStorage.getLocks(t.getId()).stream())
+
.collect(Collectors.toList());
+
+ Assert.assertEquals(beforeLocksInStorage, afterLocksInStorage);
+ }
+
+ @Test
+ public void testSyncFromStorageWithMissingTaskPriority() throws
EntryExistsException
+ {
+ final Task task = NoopTask.create();
+ taskStorage.insert(task, TaskStatus.running(task.getId()));
+ taskStorage.addLock(
+ task.getId(),
+ new TaskLock(
+ TaskLockType.EXCLUSIVE,
+ task.getGroupId(),
+ task.getDataSource(),
+ Intervals.of("2017/2018"),
+ "v1",
+ task.getPriority()
+ )
+ );
+
+ final List<TaskLock> beforeLocksInStorage =
taskStorage.getActiveTasks().stream()
+ .flatMap(t ->
taskStorage.getLocks(t.getId()).stream())
+
.collect(Collectors.toList());
+
+ final TaskLockbox lockbox = new TaskLockbox(taskStorage);
+ lockbox.syncFromStorage();
+
+ final List<TaskLock> afterLocksInStorage =
taskStorage.getActiveTasks().stream()
+ .flatMap(t ->
taskStorage.getLocks(t.getId()).stream())
+
.collect(Collectors.toList());
+
+ Assert.assertEquals(beforeLocksInStorage, afterLocksInStorage);
+ }
+
+ @Test
+ public void testSyncFromStorageWithInvalidPriority() throws
EntryExistsException
+ {
+ final Task task = NoopTask.create();
+ taskStorage.insert(task, TaskStatus.running(task.getId()));
+ taskStorage.addLock(
+ task.getId(),
+ new TaskLock(
+ TaskLockType.EXCLUSIVE,
+ task.getGroupId(),
+ task.getDataSource(),
+ Intervals.of("2017/2018"),
+ "v1",
+ 10
+ )
+ );
+
+ final TaskLockbox lockbox = new TaskLockbox(taskStorage);
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage("lock priority[10] is different from task
priority[50]");
+ lockbox.syncFromStorage();
+ }
+
@Test
public void testRevokedLockSyncFromStorage() throws EntryExistsException
{
@@ -504,4 +585,67 @@ public void testUnlock() throws EntryExistsException
.flatMap(task -> taskStorage.getLocks(task.getId()).stream())
.collect(Collectors.toSet());
}
+
+ private static class TaskLockWithoutPriority extends TaskLock
+ {
+ @JsonCreator
+ TaskLockWithoutPriority(
+ String groupId,
+ String dataSource,
+ Interval interval,
+ String version
+ )
+ {
+ super(null, groupId, dataSource, interval, version, 0, false);
+ }
+
+ @Override
+ @JsonProperty
+ public TaskLockType getType()
+ {
+ return super.getType();
+ }
+
+ @Override
+ @JsonProperty
+ public String getGroupId()
+ {
+ return super.getGroupId();
+ }
+
+ @Override
+ @JsonProperty
+ public String getDataSource()
+ {
+ return super.getDataSource();
+ }
+
+ @Override
+ @JsonProperty
+ public Interval getInterval()
+ {
+ return super.getInterval();
+ }
+
+ @Override
+ @JsonProperty
+ public String getVersion()
+ {
+ return super.getVersion();
+ }
+
+ @JsonIgnore
+ @Override
+ public Integer getPriority()
+ {
+ return super.getPriority();
+ }
+
+ @JsonIgnore
+ @Override
+ public boolean isRevoked()
+ {
+ return super.isRevoked();
+ }
+ }
}
diff --git
a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java
b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java
index f389351ed38..8bfdc3cb23b 100644
---
a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java
+++
b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java
@@ -37,7 +37,6 @@
import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.common.task.Task;
-import io.druid.indexing.common.task.Tasks;
import io.druid.indexing.overlord.HeapMemoryTaskStorage;
import io.druid.indexing.overlord.IndexerMetadataStorageAdapter;
import io.druid.indexing.overlord.TaskLockbox;
@@ -81,7 +80,6 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
@@ -232,12 +230,6 @@ public void testOverlordRun() throws Exception
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(ImmutableMap.of("task", taskId_0),
response.getEntity());
- final Map<String, Object> context = task_0.getContext();
- Assert.assertEquals(1, context.size());
- final Integer priority = (Integer) context.get(Tasks.PRIORITY_KEY);
- Assert.assertNotNull(priority);
- Assert.assertEquals(Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY,
priority.intValue());
-
// Duplicate task - should fail
response = overlordResource.taskPost(task_0, req);
Assert.assertEquals(400, response.getStatus());
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]