This is an automated email from the ASF dual-hosted git repository.

amatya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 08b5a8b88e9 Ignore append locks for compaction when using concurrent 
locks (#16316)
08b5a8b88e9 is described below

commit 08b5a8b88e942f0c7393a61f5cdbc6959924886e
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Mon Apr 22 23:26:45 2024 +0530

    Ignore append locks for compaction when using concurrent locks (#16316)
    
    * Ignore append locks for compaction when using concurrent locks
---
 .../druid/indexing/overlord/TaskLockbox.java       | 15 ++++++++++--
 .../druid/indexing/overlord/TaskLockboxTest.java   | 28 ++++++++++++++++++++++
 2 files changed, 41 insertions(+), 2 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
index 35ec79d74ec..7248fcab865 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
@@ -960,8 +960,19 @@ public class TaskLockbox
             }
 
             final int priority = lockFilter.getPriority();
-            final boolean ignoreAppendLocks =
-                
TaskLockType.REPLACE.name().equals(lockFilter.getContext().get(Tasks.TASK_LOCK_TYPE));
+            final boolean isReplaceLock = TaskLockType.REPLACE.name().equals(
+                lockFilter.getContext().getOrDefault(
+                    Tasks.TASK_LOCK_TYPE,
+                    Tasks.DEFAULT_TASK_LOCK_TYPE
+                )
+            );
+            final boolean isUsingConcurrentLocks = Boolean.TRUE.equals(
+                lockFilter.getContext().getOrDefault(
+                    Tasks.USE_CONCURRENT_LOCKS,
+                    Tasks.DEFAULT_USE_CONCURRENT_LOCKS
+                )
+            );
+            final boolean ignoreAppendLocks = isUsingConcurrentLocks || 
isReplaceLock;
 
             running.get(datasource).forEach(
                 (startTime, startTimeLocks) -> startTimeLocks.forEach(
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
index 999d4d0abb2..ab4bf3a504f 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
@@ -1325,6 +1325,34 @@ public class TaskLockboxTest
     Assert.assertTrue(conflictingIntervals.isEmpty());
   }
 
+  @Test
+  public void testGetLockedIntervalsForLowerPriorityUseConcurrentLocks()
+  {
+    final Task task = NoopTask.ofPriority(50);
+    lockbox.add(task);
+    taskStorage.insert(task, TaskStatus.running(task.getId()));
+    tryTimeChunkLock(
+        TaskLockType.APPEND,
+        task,
+        Intervals.of("2017/2018")
+    );
+
+    LockFilterPolicy requestForReplaceLowerPriorityLock = new LockFilterPolicy(
+        task.getDataSource(),
+        25,
+        ImmutableMap.of(
+            Tasks.TASK_LOCK_TYPE,
+            TaskLockType.EXCLUSIVE.name(),
+            Tasks.USE_CONCURRENT_LOCKS,
+            true
+        )
+    );
+
+    Map<String, List<Interval>> conflictingIntervals =
+        
lockbox.getLockedIntervals(ImmutableList.of(requestForReplaceLowerPriorityLock));
+    Assert.assertTrue(conflictingIntervals.isEmpty());
+  }
+
 
   @Test
   public void testExclusiveLockCompatibility()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to