This is an automated email from the ASF dual-hosted git repository.
amatya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 08b5a8b88e9 Ignore append locks for compaction when using concurrent
locks (#16316)
08b5a8b88e9 is described below
commit 08b5a8b88e942f0c7393a61f5cdbc6959924886e
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Mon Apr 22 23:26:45 2024 +0530
Ignore append locks for compaction when using concurrent locks (#16316)
* Ignore append locks for compaction when using concurrent locks
---
.../druid/indexing/overlord/TaskLockbox.java | 15 ++++++++++--
.../druid/indexing/overlord/TaskLockboxTest.java | 28 ++++++++++++++++++++++
2 files changed, 41 insertions(+), 2 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
index 35ec79d74ec..7248fcab865 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
@@ -960,8 +960,19 @@ public class TaskLockbox
}
final int priority = lockFilter.getPriority();
- final boolean ignoreAppendLocks =
-
TaskLockType.REPLACE.name().equals(lockFilter.getContext().get(Tasks.TASK_LOCK_TYPE));
+ final boolean isReplaceLock = TaskLockType.REPLACE.name().equals(
+ lockFilter.getContext().getOrDefault(
+ Tasks.TASK_LOCK_TYPE,
+ Tasks.DEFAULT_TASK_LOCK_TYPE
+ )
+ );
+ final boolean isUsingConcurrentLocks = Boolean.TRUE.equals(
+ lockFilter.getContext().getOrDefault(
+ Tasks.USE_CONCURRENT_LOCKS,
+ Tasks.DEFAULT_USE_CONCURRENT_LOCKS
+ )
+ );
+ final boolean ignoreAppendLocks = isUsingConcurrentLocks ||
isReplaceLock;
running.get(datasource).forEach(
(startTime, startTimeLocks) -> startTimeLocks.forEach(
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
index 999d4d0abb2..ab4bf3a504f 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
@@ -1325,6 +1325,34 @@ public class TaskLockboxTest
Assert.assertTrue(conflictingIntervals.isEmpty());
}
+ @Test
+ public void testGetLockedIntervalsForLowerPriorityUseConcurrentLocks()
+ {
+ final Task task = NoopTask.ofPriority(50);
+ lockbox.add(task);
+ taskStorage.insert(task, TaskStatus.running(task.getId()));
+ tryTimeChunkLock(
+ TaskLockType.APPEND,
+ task,
+ Intervals.of("2017/2018")
+ );
+
+ LockFilterPolicy requestForReplaceLowerPriorityLock = new LockFilterPolicy(
+ task.getDataSource(),
+ 25,
+ ImmutableMap.of(
+ Tasks.TASK_LOCK_TYPE,
+ TaskLockType.EXCLUSIVE.name(),
+ Tasks.USE_CONCURRENT_LOCKS,
+ true
+ )
+ );
+
+ Map<String, List<Interval>> conflictingIntervals =
+
lockbox.getLockedIntervals(ImmutableList.of(requestForReplaceLowerPriorityLock));
+ Assert.assertTrue(conflictingIntervals.isEmpty());
+ }
+
@Test
public void testExclusiveLockCompatibility()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]