This is an automated email from the ASF dual-hosted git repository.
amatya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 92a40d81699 Add API to fetch conflicting task locks (#16799)
92a40d81699 is described below
commit 92a40d81699742012deb795dabba29fdffe683c1
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Tue Jul 30 11:40:48 2024 +0530
Add API to fetch conflicting task locks (#16799)
* Add API to fetch conflicting active locks
---
.../druid/indexing/overlord/TaskLockbox.java | 92 +++---
.../druid/indexing/overlord/TaskQueryTool.java | 16 +-
.../indexing/overlord/http/OverlordResource.java | 17 +-
.../indexing/overlord/http/TaskLockResponse.java | 54 ++++
.../druid/indexing/overlord/TaskLockboxTest.java | 307 ++++++++++++---------
.../overlord/http/OverlordResourceTest.java | 82 +++++-
.../druid/testsEx/indexer/ITIndexerTest.java | 5 +-
.../clients/OverlordResourceTestClient.java | 7 +-
.../duty/ITAutoCompactionLockContentionTest.java | 7 +-
.../apache/druid/tests/indexer/ITIndexerTest.java | 5 +-
.../apache/druid/metadata/LockFilterPolicy.java | 31 +--
.../server/coordinator/duty/CompactSegments.java | 3 +-
.../druid/rpc/indexing/OverlordClientImplTest.java | 4 +-
13 files changed, 396 insertions(+), 234 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
index 2155ac2c265..bebb52157d6 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
@@ -42,6 +42,7 @@ import
org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.UOE;
@@ -49,6 +50,7 @@ import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.metadata.ReplaceTaskLock;
+import org.apache.druid.query.QueryContexts;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@@ -992,50 +994,76 @@ public class TaskLockbox
}
/**
- * Gets a List of Intervals locked by higher priority tasks for each
datasource.
- * Here, Segment Locks are being treated the same as Time Chunk Locks i.e.
- * a Task with a Segment Lock is assumed to lock a whole Interval and not
just
- * the corresponding Segment.
- *
- * @param minTaskPriority Minimum task priority for each datasource. Only the
- * Intervals that are locked by Tasks with equal or
- * higher priority than this are returned. Locked
intervals
- * for datasources that are not present in this Map
are
- * not returned.
- * @return Map from Datasource to List of Intervals locked by Tasks that have
- * priority greater than or equal to the {@code minTaskPriority} for that
datasource.
+ * @param lockFilterPolicies Lock filters for the given datasources
+ * @return Map from datasource to list of non-revoked locks with at least as
much priority and an overlapping interval
*/
- public Map<String, List<Interval>> getLockedIntervals(Map<String, Integer>
minTaskPriority)
+ public Map<String, List<TaskLock>> getActiveLocks(List<LockFilterPolicy>
lockFilterPolicies)
{
- final Map<String, Set<Interval>> datasourceToIntervals = new HashMap<>();
+ final Map<String, List<TaskLock>> datasourceToLocks = new HashMap<>();
// Take a lock and populate the maps
giant.lock();
+
try {
- running.forEach(
- (datasource, datasourceLocks) -> {
- // If this datasource is not requested, do not proceed
- if (!minTaskPriority.containsKey(datasource)) {
+ lockFilterPolicies.forEach(
+ lockFilter -> {
+ final String datasource = lockFilter.getDatasource();
+ if (!running.containsKey(datasource)) {
return;
}
- datasourceLocks.forEach(
+ final int priority = lockFilter.getPriority();
+ final List<Interval> intervals;
+ if (lockFilter.getIntervals() != null) {
+ intervals = lockFilter.getIntervals();
+ } else {
+ intervals = Collections.singletonList(Intervals.ETERNITY);
+ }
+
+ final Map<String, Object> context = lockFilter.getContext();
+ final boolean ignoreAppendLocks;
+ final Boolean useConcurrentLocks = QueryContexts.getAsBoolean(
+ Tasks.USE_CONCURRENT_LOCKS,
+ context.get(Tasks.USE_CONCURRENT_LOCKS)
+ );
+ if (useConcurrentLocks == null) {
+ TaskLockType taskLockType = QueryContexts.getAsEnum(
+ Tasks.TASK_LOCK_TYPE,
+ context.get(Tasks.TASK_LOCK_TYPE),
+ TaskLockType.class
+ );
+ if (taskLockType == null) {
+ ignoreAppendLocks = Tasks.DEFAULT_USE_CONCURRENT_LOCKS;
+ } else {
+ ignoreAppendLocks = taskLockType == TaskLockType.APPEND;
+ }
+ } else {
+ ignoreAppendLocks = useConcurrentLocks;
+ }
+
+ running.get(datasource).forEach(
(startTime, startTimeLocks) -> startTimeLocks.forEach(
(interval, taskLockPosses) -> taskLockPosses.forEach(
taskLockPosse -> {
if (taskLockPosse.getTaskLock().isRevoked()) {
- // Do not proceed if the lock is revoked
- return;
+ // do nothing
} else if (taskLockPosse.getTaskLock().getPriority()
== null
- ||
taskLockPosse.getTaskLock().getPriority() < minTaskPriority.get(datasource)) {
- // Do not proceed if the lock has a priority
strictly less than the minimum
- return;
+ ||
taskLockPosse.getTaskLock().getPriority() < priority) {
+ // do nothing
+ } else if (ignoreAppendLocks
+ && taskLockPosse.getTaskLock().getType()
== TaskLockType.APPEND) {
+ // do nothing
+ } else {
+ for (Interval filterInterval : intervals) {
+ if (interval.overlaps(filterInterval)) {
+ datasourceToLocks.computeIfAbsent(datasource,
ds -> new ArrayList<>())
+
.add(taskLockPosse.getTaskLock());
+ break;
+ }
+ }
}
-
- datasourceToIntervals
- .computeIfAbsent(datasource, k -> new
HashSet<>())
- .add(interval);
- })
+ }
+ )
)
);
}
@@ -1045,11 +1073,7 @@ public class TaskLockbox
giant.unlock();
}
- return datasourceToIntervals.entrySet().stream()
- .collect(Collectors.toMap(
- Map.Entry::getKey,
- entry -> new ArrayList<>(entry.getValue())
- ));
+ return datasourceToLocks;
}
public void unlock(final Task task, final Interval interval)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java
index f5351d7c6e5..b25bde067c7 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java
@@ -27,6 +27,7 @@ import org.apache.druid.common.config.JacksonConfigManager;
import org.apache.druid.indexer.TaskInfo;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
import org.apache.druid.indexing.overlord.http.TaskStateLookup;
@@ -94,19 +95,12 @@ public class TaskQueryTool
}
/**
- * Gets a List of Intervals locked by higher priority tasks for each
datasource.
- *
- * @param minTaskPriority Minimum task priority for each datasource. Only the
- * Intervals that are locked by Tasks with equal or
- * higher priority than this are returned. Locked
intervals
- * for datasources that are not present in this Map
are
- * not returned.
- * @return Map from Datasource to List of Intervals locked by Tasks that have
- * priority greater than or equal to the {@code minTaskPriority} for that
datasource.
+ * @param lockFilterPolicies Requests for active locks for various
datasources
+ * @return Map from datasource to conflicting lock infos
*/
- public Map<String, List<Interval>> getLockedIntervals(Map<String, Integer>
minTaskPriority)
+ public Map<String, List<TaskLock>> getActiveLocks(List<LockFilterPolicy>
lockFilterPolicies)
{
- return taskLockbox.getLockedIntervals(minTaskPriority);
+ return taskLockbox.getActiveLocks(lockFilterPolicies);
}
public List<TaskInfo<Task, TaskStatus>> getActiveTaskInfo(@Nullable String
dataSource)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
index 54ada7cb2b4..b62e1de055f 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
@@ -241,33 +241,32 @@ public class OverlordResource
}
}
- @Deprecated
@POST
- @Path("/lockedIntervals")
+ @Path("/lockedIntervals/v2")
@Produces(MediaType.APPLICATION_JSON)
@ResourceFilters(StateResourceFilter.class)
- public Response getDatasourceLockedIntervals(Map<String, Integer>
minTaskPriority)
+ public Response getDatasourceLockedIntervals(List<LockFilterPolicy>
lockFilterPolicies)
{
- if (minTaskPriority == null || minTaskPriority.isEmpty()) {
- return Response.status(Status.BAD_REQUEST).entity("No Datasource
provided").build();
+ if (lockFilterPolicies == null || lockFilterPolicies.isEmpty()) {
+ return Response.status(Status.BAD_REQUEST).entity("No filter
provided").build();
}
// Build the response
- return
Response.ok(taskQueryTool.getLockedIntervals(minTaskPriority)).build();
+ return
Response.ok(taskQueryTool.getLockedIntervals(lockFilterPolicies)).build();
}
@POST
- @Path("/lockedIntervals/v2")
+ @Path("/activeLocks")
@Produces(MediaType.APPLICATION_JSON)
@ResourceFilters(StateResourceFilter.class)
- public Response getDatasourceLockedIntervalsV2(List<LockFilterPolicy>
lockFilterPolicies)
+ public Response getActiveLocks(List<LockFilterPolicy> lockFilterPolicies)
{
if (lockFilterPolicies == null || lockFilterPolicies.isEmpty()) {
return Response.status(Status.BAD_REQUEST).entity("No filter
provided").build();
}
// Build the response
- return
Response.ok(taskQueryTool.getLockedIntervals(lockFilterPolicies)).build();
+ return Response.ok(new
TaskLockResponse(taskQueryTool.getActiveLocks(lockFilterPolicies))).build();
}
@GET
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/TaskLockResponse.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/TaskLockResponse.java
new file mode 100644
index 00000000000..df4a5d8b03c
--- /dev/null
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/TaskLockResponse.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.http;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.indexing.common.TaskLock;
+
+import java.util.List;
+import java.util.Map;
+
+public class TaskLockResponse
+{
+ private final Map<String, List<TaskLock>> datasourceToLocks;
+
+ @JsonCreator
+ public TaskLockResponse(
+ @JsonProperty("datasourceToLocks") final Map<String, List<TaskLock>>
datasourceToLocks
+ )
+ {
+ this.datasourceToLocks = datasourceToLocks;
+ }
+
+ @JsonProperty
+ public Map<String, List<TaskLock>> getDatasourceToLocks()
+ {
+ return datasourceToLocks;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "TaskLockResponse{" +
+ "datasourceToLocks='" + datasourceToLocks +
+ '}';
+ }
+}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
index a02b5108767..a8c4b5117b1 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
@@ -75,7 +75,6 @@ import org.junit.Rule;
import org.junit.Test;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -1174,99 +1173,6 @@ public class TaskLockboxTest
);
}
- @Test
- public void testGetLockedIntervals()
- {
- // Acquire locks for task1
- final Task task1 = NoopTask.forDatasource("ds1");
- lockbox.add(task1);
-
- tryTimeChunkLock(
- TaskLockType.EXCLUSIVE,
- task1,
- Intervals.of("2017-01-01/2017-02-01")
- );
- tryTimeChunkLock(
- TaskLockType.EXCLUSIVE,
- task1,
- Intervals.of("2017-04-01/2017-05-01")
- );
-
- // Acquire locks for task2
- final Task task2 = NoopTask.forDatasource("ds2");
- lockbox.add(task2);
- tryTimeChunkLock(
- TaskLockType.EXCLUSIVE,
- task2,
- Intervals.of("2017-03-01/2017-04-01")
- );
-
- // Verify the locked intervals
- final Map<String, Integer> minTaskPriority = new HashMap<>();
- minTaskPriority.put(task1.getDataSource(), 10);
- minTaskPriority.put(task2.getDataSource(), 10);
- final Map<String, List<Interval>> lockedIntervals =
lockbox.getLockedIntervals(minTaskPriority);
- Assert.assertEquals(2, lockedIntervals.size());
-
- Assert.assertEquals(
- Arrays.asList(
- Intervals.of("2017-01-01/2017-02-01"),
- Intervals.of("2017-04-01/2017-05-01")
- ),
- lockedIntervals.get(task1.getDataSource())
- );
-
- Assert.assertEquals(
- Collections.singletonList(
- Intervals.of("2017-03-01/2017-04-01")),
- lockedIntervals.get(task2.getDataSource())
- );
- }
-
- @Test
- public void testGetLockedIntervalsForLowPriorityTask()
- {
- // Acquire lock for a low priority task
- final Task lowPriorityTask = NoopTask.ofPriority(5);
- lockbox.add(lowPriorityTask);
- taskStorage.insert(lowPriorityTask,
TaskStatus.running(lowPriorityTask.getId()));
- tryTimeChunkLock(
- TaskLockType.EXCLUSIVE,
- lowPriorityTask,
- Intervals.of("2017/2018")
- );
-
- final Map<String, Integer> minTaskPriority = new HashMap<>();
- minTaskPriority.put(lowPriorityTask.getDataSource(), 10);
-
- Map<String, List<Interval>> lockedIntervals =
lockbox.getLockedIntervals(minTaskPriority);
- Assert.assertTrue(lockedIntervals.isEmpty());
- }
-
- @Test
- public void testGetLockedIntervalsForEqualPriorityTask()
- {
- // Acquire lock for a low priority task
- final Task task = NoopTask.ofPriority(5);
- lockbox.add(task);
- taskStorage.insert(task, TaskStatus.running(task.getId()));
- tryTimeChunkLock(
- TaskLockType.EXCLUSIVE,
- task,
- Intervals.of("2017/2018")
- );
-
- final Map<String, Integer> minTaskPriority = new HashMap<>();
- minTaskPriority.put(task.getDataSource(), 5);
-
- Map<String, List<Interval>> lockedIntervals =
lockbox.getLockedIntervals(minTaskPriority);
- Assert.assertEquals(1, lockedIntervals.size());
- Assert.assertEquals(
- Collections.singletonList(Intervals.of("2017/2018")),
- lockedIntervals.get(task.getDataSource())
- );
- }
-
@Test
public void testGetLockedIntervalsForHigherPriorityExclusiveLock()
{
@@ -1282,6 +1188,7 @@ public class TaskLockboxTest
LockFilterPolicy requestForExclusiveLowerPriorityLock = new
LockFilterPolicy(
task.getDataSource(),
75,
+ null,
null
);
@@ -1305,6 +1212,7 @@ public class TaskLockboxTest
LockFilterPolicy requestForExclusiveLowerPriorityLock = new
LockFilterPolicy(
task.getDataSource(),
25,
+ null,
null
);
@@ -1332,6 +1240,7 @@ public class TaskLockboxTest
LockFilterPolicy requestForReplaceLowerPriorityLock = new LockFilterPolicy(
task.getDataSource(),
25,
+ null,
ImmutableMap.of(Tasks.TASK_LOCK_TYPE, TaskLockType.REPLACE.name())
);
@@ -1355,6 +1264,7 @@ public class TaskLockboxTest
LockFilterPolicy requestForReplaceLowerPriorityLock = new LockFilterPolicy(
task.getDataSource(),
25,
+ null,
ImmutableMap.of(
Tasks.TASK_LOCK_TYPE,
TaskLockType.EXCLUSIVE.name(),
@@ -1369,6 +1279,171 @@ public class TaskLockboxTest
}
+ @Test
+ public void testGetActiveLocks()
+ {
+ final Set<TaskLock> expectedLocks = new HashSet<>();
+ final TaskLock overlappingReplaceLock =
+ validator.expectLockCreated(TaskLockType.REPLACE,
Intervals.of("2024/2025"), 50);
+ expectedLocks.add(overlappingReplaceLock);
+
+ //Lower priority
+ validator.expectLockCreated(TaskLockType.APPEND,
Intervals.of("2024/2025"), 25);
+
+ final TaskLock overlappingAppendLock =
+ validator.expectLockCreated(TaskLockType.APPEND,
Intervals.of("2024-01-01/2024-02-01"), 75);
+ expectedLocks.add(overlappingAppendLock);
+
+ // Non-overlapping interval
+ validator.expectLockCreated(TaskLockType.APPEND,
Intervals.of("2024-12-01/2025-01-01"), 75);
+
+ final TaskLock overlappingExclusiveLock =
+ validator.expectLockCreated(TaskLockType.EXCLUSIVE,
Intervals.of("2020/2021"), 50);
+ expectedLocks.add(overlappingExclusiveLock);
+
+ LockFilterPolicy policy = new LockFilterPolicy(
+ "none",
+ 50,
+ ImmutableList.of(Intervals.of("2020/2021"),
Intervals.of("2024-01-01/2024-07-01")),
+ null
+ );
+
+ LockFilterPolicy policyForNonExistentDatasource = new LockFilterPolicy(
+ "nonExistent",
+ 0,
+ null,
+ null
+ );
+
+ Map<String, List<TaskLock>> activeLocks =
+ lockbox.getActiveLocks(ImmutableList.of(policy,
policyForNonExistentDatasource));
+ Assert.assertEquals(1, activeLocks.size());
+ Assert.assertEquals(expectedLocks, new HashSet<>(activeLocks.get("none")));
+ }
+
+ @Test
+ public void testGetActiveLocksWithAppendLockIgnoresAppendLocks()
+ {
+ final Set<TaskLock> expectedLocks = new HashSet<>();
+ final TaskLock overlappingReplaceLock =
+ validator.expectLockCreated(TaskLockType.REPLACE,
Intervals.of("2024/2025"), 50);
+ expectedLocks.add(overlappingReplaceLock);
+
+ //Lower priority
+ validator.expectLockCreated(TaskLockType.APPEND,
Intervals.of("2024/2025"), 25);
+
+ validator.expectLockCreated(TaskLockType.APPEND,
Intervals.of("2024-01-01/2024-02-01"), 75);
+
+ // Non-overlapping interval
+ validator.expectLockCreated(TaskLockType.APPEND,
Intervals.of("2024-12-01/2025-01-01"), 75);
+
+ final TaskLock overlappingExclusiveLock =
+ validator.expectLockCreated(TaskLockType.EXCLUSIVE,
Intervals.of("2020/2021"), 50);
+ expectedLocks.add(overlappingExclusiveLock);
+
+ LockFilterPolicy policy = new LockFilterPolicy(
+ "none",
+ 50,
+ ImmutableList.of(Intervals.of("2020/2021"),
Intervals.of("2024-01-01/2024-07-01")),
+ ImmutableMap.of(Tasks.TASK_LOCK_TYPE, TaskLockType.APPEND.name())
+ );
+
+ LockFilterPolicy policyForNonExistentDatasource = new LockFilterPolicy(
+ "nonExistent",
+ 0,
+ null,
+ null
+ );
+
+ Map<String, List<TaskLock>> activeLocks =
+ lockbox.getActiveLocks(ImmutableList.of(policy,
policyForNonExistentDatasource));
+ Assert.assertEquals(1, activeLocks.size());
+ Assert.assertEquals(expectedLocks, new HashSet<>(activeLocks.get("none")));
+ }
+
+ @Test
+ public void testGetActiveLocksWithConcurrentLocksIgnoresAppendLocks()
+ {
+ final Set<TaskLock> expectedLocks = new HashSet<>();
+ final TaskLock overlappingReplaceLock =
+ validator.expectLockCreated(TaskLockType.REPLACE,
Intervals.of("2024/2025"), 50);
+ expectedLocks.add(overlappingReplaceLock);
+
+ //Lower priority
+ validator.expectLockCreated(TaskLockType.APPEND,
Intervals.of("2024/2025"), 25);
+
+ validator.expectLockCreated(TaskLockType.APPEND,
Intervals.of("2024-01-01/2024-02-01"), 75);
+
+ // Non-overlapping interval
+ validator.expectLockCreated(TaskLockType.APPEND,
Intervals.of("2024-12-01/2025-01-01"), 75);
+
+ final TaskLock overlappingExclusiveLock =
+ validator.expectLockCreated(TaskLockType.EXCLUSIVE,
Intervals.of("2020/2021"), 50);
+ expectedLocks.add(overlappingExclusiveLock);
+
+ LockFilterPolicy policy = new LockFilterPolicy(
+ "none",
+ 50,
+ ImmutableList.of(Intervals.of("2020/2021"),
Intervals.of("2024-01-01/2024-07-01")),
+ ImmutableMap.of(Tasks.USE_CONCURRENT_LOCKS, true,
Tasks.TASK_LOCK_TYPE, TaskLockType.EXCLUSIVE.name())
+ );
+
+ LockFilterPolicy policyForNonExistentDatasource = new LockFilterPolicy(
+ "nonExistent",
+ 0,
+ null,
+ null
+ );
+
+ Map<String, List<TaskLock>> activeLocks =
+ lockbox.getActiveLocks(ImmutableList.of(policy,
policyForNonExistentDatasource));
+ Assert.assertEquals(1, activeLocks.size());
+ Assert.assertEquals(expectedLocks, new HashSet<>(activeLocks.get("none")));
+ }
+
+ @Test
+ public void testGetActiveLocksWithoutConcurrentLocksConsidersAppendLocks()
+ {
+ final Set<TaskLock> expectedLocks = new HashSet<>();
+ final TaskLock overlappingReplaceLock =
+ validator.expectLockCreated(TaskLockType.REPLACE,
Intervals.of("2024/2025"), 50);
+
+ expectedLocks.add(overlappingReplaceLock);
+
+ //Lower priority
+ validator.expectLockCreated(TaskLockType.APPEND,
Intervals.of("2024/2025"), 25);
+
+ final TaskLock overlappingAppendLock =
+ validator.expectLockCreated(TaskLockType.APPEND,
Intervals.of("2024-01-01/2024-02-01"), 75);
+ expectedLocks.add(overlappingAppendLock);
+
+ // Non-overlapping interval
+ validator.expectLockCreated(TaskLockType.APPEND,
Intervals.of("2024-12-01/2025-01-01"), 75);
+
+ final TaskLock overlappingExclusiveLock =
+ validator.expectLockCreated(TaskLockType.EXCLUSIVE,
Intervals.of("2020/2021"), 50);
+ expectedLocks.add(overlappingExclusiveLock);
+
+ LockFilterPolicy policy = new LockFilterPolicy(
+ "none",
+ 50,
+ ImmutableList.of(Intervals.of("2020/2021"),
Intervals.of("2024-01-01/2024-07-01")),
+ ImmutableMap.of(Tasks.USE_CONCURRENT_LOCKS, false,
Tasks.TASK_LOCK_TYPE, TaskLockType.APPEND.name())
+ );
+
+ LockFilterPolicy policyForNonExistentDatasource = new LockFilterPolicy(
+ "nonExistent",
+ 0,
+ null,
+ null
+ );
+
+ Map<String, List<TaskLock>> activeLocks =
+ lockbox.getActiveLocks(ImmutableList.of(policy,
policyForNonExistentDatasource));
+ Assert.assertEquals(1, activeLocks.size());
+ Assert.assertEquals(expectedLocks, new HashSet<>(activeLocks.get("none")));
+ }
+
@Test
public void testExclusiveLockCompatibility()
{
@@ -1770,50 +1845,6 @@ public class TaskLockboxTest
validator.expectLockNotGranted(TaskLockType.APPEND, otherGroupTask,
Intervals.of("2024/2025"));
}
- @Test
- public void testGetLockedIntervalsForRevokedLocks()
- {
- // Acquire lock for a low priority task
- final Task lowPriorityTask = NoopTask.ofPriority(5);
- lockbox.add(lowPriorityTask);
- taskStorage.insert(lowPriorityTask,
TaskStatus.running(lowPriorityTask.getId()));
- tryTimeChunkLock(
- TaskLockType.EXCLUSIVE,
- lowPriorityTask,
- Intervals.of("2017/2018")
- );
-
- final Map<String, Integer> minTaskPriority = new HashMap<>();
- minTaskPriority.put(lowPriorityTask.getDataSource(), 1);
-
- Map<String, List<Interval>> lockedIntervals =
lockbox.getLockedIntervals(minTaskPriority);
- Assert.assertEquals(1, lockedIntervals.size());
- Assert.assertEquals(
- Collections.singletonList(
- Intervals.of("2017/2018")),
- lockedIntervals.get(lowPriorityTask.getDataSource())
- );
-
- // Revoke the lowPriorityTask
- final Task highPriorityTask = NoopTask.ofPriority(10);
- lockbox.add(highPriorityTask);
- tryTimeChunkLock(
- TaskLockType.EXCLUSIVE,
- highPriorityTask,
- Intervals.of("2017-05-01/2017-06-01")
- );
-
- // Verify the locked intervals
- minTaskPriority.put(highPriorityTask.getDataSource(), 1);
- lockedIntervals = lockbox.getLockedIntervals(minTaskPriority);
- Assert.assertEquals(1, lockedIntervals.size());
- Assert.assertEquals(
- Collections.singletonList(
- Intervals.of("2017-05-01/2017-06-01")),
- lockedIntervals.get(highPriorityTask.getDataSource())
- );
- }
-
@Test
public void testFailedToReacquireTaskLock()
{
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
index e6dee0c7e40..93f80ac9709 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
@@ -36,6 +36,9 @@ import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.indexing.common.TaskLock;
+import org.apache.druid.indexing.common.TaskLockType;
+import org.apache.druid.indexing.common.TimeChunkLock;
import org.apache.druid.indexing.common.task.KillUnusedSegmentsTask;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
@@ -61,6 +64,7 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.UOE;
+import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.metadata.TaskLookup;
import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup;
import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup;
@@ -1057,31 +1061,33 @@ public class OverlordResourceTest
@Test
public void testGetLockedIntervals() throws Exception
{
- final Map<String, Integer> minTaskPriority =
Collections.singletonMap("ds1", 0);
- final Map<String, List<Interval>> expectedLockedIntervals =
Collections.singletonMap(
+ final List<LockFilterPolicy> lockFilterPolicies = ImmutableList.of(
+ new LockFilterPolicy("ds1", 25, null, null)
+ );
+ final Map<String, List<Interval>> expectedIntervals =
Collections.singletonMap(
"ds1",
Arrays.asList(
Intervals.of("2012-01-01/2012-01-02"),
- Intervals.of("2012-01-02/2012-01-03")
+ Intervals.of("2012-01-01/2012-01-02")
)
);
- EasyMock.expect(taskLockbox.getLockedIntervals(minTaskPriority))
- .andReturn(expectedLockedIntervals);
+ EasyMock.expect(taskLockbox.getLockedIntervals(lockFilterPolicies))
+ .andReturn(expectedIntervals);
replayAll();
- final Response response =
overlordResource.getDatasourceLockedIntervals(minTaskPriority);
+ final Response response =
overlordResource.getDatasourceLockedIntervals(lockFilterPolicies);
Assert.assertEquals(200, response.getStatus());
final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
- Map<String, List<Interval>> observedLockedIntervals = jsonMapper.readValue(
+ Map<String, List<Interval>> observedIntervals = jsonMapper.readValue(
jsonMapper.writeValueAsString(response.getEntity()),
new TypeReference<Map<String, List<Interval>>>()
{
}
);
- Assert.assertEquals(expectedLockedIntervals, observedLockedIntervals);
+ Assert.assertEquals(expectedIntervals, observedIntervals);
}
@Test
@@ -1092,7 +1098,65 @@ public class OverlordResourceTest
Response response = overlordResource.getDatasourceLockedIntervals(null);
Assert.assertEquals(400, response.getStatus());
- response =
overlordResource.getDatasourceLockedIntervals(Collections.emptyMap());
+ response =
overlordResource.getDatasourceLockedIntervals(Collections.emptyList());
+ Assert.assertEquals(400, response.getStatus());
+ }
+
+ @Test
+ public void testGetActiveLocks() throws Exception
+ {
+ final List<LockFilterPolicy> lockFilterPolicies = ImmutableList.of(
+ new LockFilterPolicy("ds1", 25, null, null)
+ );
+ final Map<String, List<TaskLock>> expectedLocks = Collections.singletonMap(
+ "ds1",
+ Arrays.asList(
+ new TimeChunkLock(
+ TaskLockType.REPLACE,
+ "groupId",
+ "datasource",
+ Intervals.of("2012-01-01/2012-01-02"),
+ "version",
+ 25
+ ),
+ new TimeChunkLock(
+ TaskLockType.EXCLUSIVE,
+ "groupId",
+ "datasource",
+ Intervals.of("2012-01-02/2012-01-03"),
+ "version",
+ 75
+ )
+ )
+ );
+
+ EasyMock.expect(taskLockbox.getActiveLocks(lockFilterPolicies))
+ .andReturn(expectedLocks);
+ replayAll();
+
+ final Response response =
overlordResource.getActiveLocks(lockFilterPolicies);
+ Assert.assertEquals(200, response.getStatus());
+
+ final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
+ Map<String, List<TaskLock>> observedLocks = jsonMapper.readValue(
+ jsonMapper.writeValueAsString(response.getEntity()),
+ new TypeReference<TaskLockResponse>()
+ {
+ }
+ ).getDatasourceToLocks();
+
+ Assert.assertEquals(expectedLocks, observedLocks);
+ }
+
+ @Test
+ public void testGetActiveLocksWithEmptyBody()
+ {
+ replayAll();
+
+ Response response = overlordResource.getActiveLocks(null);
+ Assert.assertEquals(400, response.getStatus());
+
+ response = overlordResource.getActiveLocks(Collections.emptyList());
Assert.assertEquals(400, response.getStatus());
}
diff --git
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITIndexerTest.java
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITIndexerTest.java
index 65b8dc0b1ac..84f2dff1d79 100644
---
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITIndexerTest.java
+++
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITIndexerTest.java
@@ -24,6 +24,7 @@ import com.google.inject.Inject;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
import org.apache.druid.testing.utils.ITRetryUtil;
@@ -343,12 +344,12 @@ public class ITIndexerTest extends
AbstractITBatchIndexTest
submitIndexTask(INDEX_TASK, datasourceName);
// Wait until it acquires a lock
- final Map<String, Integer> minTaskPriority =
Collections.singletonMap(datasourceName, 0);
+ final List<LockFilterPolicy> lockFilterPolicies =
Collections.singletonList(new LockFilterPolicy(datasourceName, 0, null, null));
final Map<String, List<Interval>> lockedIntervals = new HashMap<>();
ITRetryUtil.retryUntilFalse(
() -> {
lockedIntervals.clear();
-
lockedIntervals.putAll(indexer.getLockedIntervals(minTaskPriority));
+
lockedIntervals.putAll(indexer.getLockedIntervals(lockFilterPolicies));
return lockedIntervals.isEmpty();
},
"Verify Intervals are Locked"
diff --git
a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
index 8167b9b64e1..f75dc6043f9 100644
---
a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
+++
b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
@@ -39,6 +39,7 @@ import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
+import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.guice.TestClient;
@@ -334,13 +335,13 @@ public class OverlordResourceTestClient
}
}
- public Map<String, List<Interval>> getLockedIntervals(Map<String, Integer>
minTaskPriority)
+ public Map<String, List<Interval>> getLockedIntervals(List<LockFilterPolicy>
lockFilterPolicies)
{
try {
- String jsonBody = jsonMapper.writeValueAsString(minTaskPriority);
+ String jsonBody = jsonMapper.writeValueAsString(lockFilterPolicies);
StatusResponseHolder response = httpClient.go(
- new Request(HttpMethod.POST, new URL(getIndexerURL() +
"lockedIntervals"))
+ new Request(HttpMethod.POST, new URL(getIndexerURL() +
"lockedIntervals/v2"))
.setContent(
"application/json",
StringUtils.toUtf8(jsonBody)
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java
index 8d980d76f12..79d63cb4550 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java
@@ -19,11 +19,13 @@
package org.apache.druid.tests.coordinator.duty;
+import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
@@ -53,7 +55,6 @@ import org.testng.annotations.Test;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -265,13 +266,13 @@ public class ITAutoCompactionLockContentionTest extends
AbstractKafkaIndexingSer
*/
private void ensureLockedIntervals(Interval... intervals)
{
- final Map<String, Integer> minTaskPriority =
Collections.singletonMap(fullDatasourceName, 0);
+ final LockFilterPolicy lockFilterPolicy = new
LockFilterPolicy(fullDatasourceName, 0, null, null);
final List<Interval> lockedIntervals = new ArrayList<>();
ITRetryUtil.retryUntilTrue(
() -> {
lockedIntervals.clear();
- Map<String, List<Interval>> allIntervals =
indexer.getLockedIntervals(minTaskPriority);
+ Map<String, List<Interval>> allIntervals =
indexer.getLockedIntervals(ImmutableList.of(lockFilterPolicy));
if (allIntervals.containsKey(fullDatasourceName)) {
lockedIntervals.addAll(allIntervals.get(fullDatasourceName));
}
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java
index f527135c80d..dfe308e2c1b 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java
@@ -24,6 +24,7 @@ import com.google.inject.Inject;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
@@ -342,12 +343,12 @@ public class ITIndexerTest extends
AbstractITBatchIndexTest
submitIndexTask(INDEX_TASK, datasourceName);
// Wait until it acquires a lock
- final Map<String, Integer> minTaskPriority =
Collections.singletonMap(datasourceName, 0);
+ final List<LockFilterPolicy> lockFilterPolicies =
Collections.singletonList(new LockFilterPolicy(datasourceName, 0, null, null));
final Map<String, List<Interval>> lockedIntervals = new HashMap<>();
ITRetryUtil.retryUntilFalse(
() -> {
lockedIntervals.clear();
-
lockedIntervals.putAll(indexer.getLockedIntervals(minTaskPriority));
+
lockedIntervals.putAll(indexer.getLockedIntervals(lockFilterPolicies));
return lockedIntervals.isEmpty();
},
"Verify Intervals are Locked"
diff --git
a/server/src/main/java/org/apache/druid/metadata/LockFilterPolicy.java
b/server/src/main/java/org/apache/druid/metadata/LockFilterPolicy.java
index 88ab4673aa8..019fd22807c 100644
--- a/server/src/main/java/org/apache/druid/metadata/LockFilterPolicy.java
+++ b/server/src/main/java/org/apache/druid/metadata/LockFilterPolicy.java
@@ -21,10 +21,12 @@ package org.apache.druid.metadata;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import org.joda.time.Interval;
+import javax.annotation.Nullable;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
-import java.util.Objects;
/**
* Specifies a policy to filter active locks held by a datasource
@@ -33,17 +35,20 @@ public class LockFilterPolicy
{
private final String datasource;
private final int priority;
+ private final List<Interval> intervals;
private final Map<String, Object> context;
@JsonCreator
public LockFilterPolicy(
@JsonProperty("datasource") String datasource,
@JsonProperty("priority") int priority,
- @JsonProperty("context") Map<String, Object> context
+ @JsonProperty("intervals") @Nullable List<Interval> intervals,
+ @JsonProperty("context") @Nullable Map<String, Object> context
)
{
this.datasource = datasource;
this.priority = priority;
+ this.intervals = intervals;
this.context = context == null ? Collections.emptyMap() : context;
}
@@ -65,24 +70,10 @@ public class LockFilterPolicy
return context;
}
- @Override
- public boolean equals(Object o)
- {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- LockFilterPolicy that = (LockFilterPolicy) o;
- return Objects.equals(datasource, that.datasource)
- && priority == that.priority
- && Objects.equals(context, that.context);
- }
-
- @Override
- public int hashCode()
+ @Nullable
+ @JsonProperty
+ public List<Interval> getIntervals()
{
- return Objects.hash(datasource, priority, context);
+ return intervals;
}
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
index 01f3bc77e9e..d6e25fc9ac6 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
@@ -278,7 +278,8 @@ public class CompactSegments implements
CoordinatorCustomDuty
{
final List<LockFilterPolicy> lockFilterPolicies = compactionConfigs
.stream()
- .map(config -> new LockFilterPolicy(config.getDataSource(),
config.getTaskPriority(), config.getTaskContext()))
+ .map(config ->
+ new LockFilterPolicy(config.getDataSource(),
config.getTaskPriority(), null, config.getTaskContext()))
.collect(Collectors.toList());
final Map<String, List<Interval>> datasourceToLockedIntervals =
new
HashMap<>(FutureUtils.getUnchecked(overlordClient.findLockedIntervals(lockFilterPolicies),
true));
diff --git
a/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java
b/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java
index 8c3b867e368..5f583746266 100644
---
a/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java
+++
b/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java
@@ -225,7 +225,7 @@ public class OverlordClientImplTest
final Map<String, List<Interval>> lockMap =
ImmutableMap.of("foo",
Collections.singletonList(Intervals.of("2000/2001")));
final List<LockFilterPolicy> requests = ImmutableList.of(
- new LockFilterPolicy("foo", 3, null)
+ new LockFilterPolicy("foo", 3, null, null)
);
serviceClient.expectAndRespond(
@@ -246,7 +246,7 @@ public class OverlordClientImplTest
public void test_findLockedIntervals_nullReturn() throws Exception
{
final List<LockFilterPolicy> requests = ImmutableList.of(
- new LockFilterPolicy("foo", 3, null)
+ new LockFilterPolicy("foo", 3, null, null)
);
serviceClient.expectAndRespond(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]