This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch 30.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/30.0.0 by this push:
     new 108709a907b Allow different timechunk lock types to coexist in a task 
group (#16369) (#16373)
108709a907b is described below

commit 108709a907bc33c5c22728e413038d0743bc2acc
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Thu May 2 23:23:25 2024 +0530

    Allow different timechunk lock types to coexist in a task group (#16369) 
(#16373)
    
    Description:
    All the streaming ingestion tasks for a given datasource share the same 
lock for a given interval.
    Changing lock types in the supervisor can lead to segment allocation errors 
due to lock conflicts
    for the new tasks while the older tasks are still running.
    
    Fix:
    Allow locks of different types (EXCLUSIVE, SHARED, APPEND, REPLACE) to 
co-exist if they have
    the same interval and the same task group.
---
 .../druid/indexing/overlord/TaskLockbox.java       | 16 +++---
 .../druid/indexing/overlord/TaskLockboxTest.java   | 61 +++++++++++++++++++---
 2 files changed, 63 insertions(+), 14 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
index 1a1369ba1e5..1155592f325 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
@@ -579,18 +579,20 @@ public class TaskLockbox
               || revokeAllIncompatibleActiveLocksIfPossible(conflictPosses, 
request)) {
             posseToUse = createNewTaskLockPosse(request);
           } else {
-            // During a rolling update, tasks of mixed versions can be run at 
the same time. Old tasks would request
-            // timeChunkLocks while new tasks would ask segmentLocks. The 
below check is to allow for old and new tasks
-            // to get locks of different granularities if they have the same 
groupId.
-            final boolean allDifferentGranularity = conflictPosses
+            // When a rolling upgrade happens or lock types are changed for an 
ongoing Streaming ingestion supervisor,
+            // the existing tasks might have or request different lock 
granularities or types than the new ones.
+            // To ensure a smooth transition, we must allocate the different 
lock types for the new tasks
+            // so that they can coexist and ingest with the required locks.
+            final boolean allLocksHaveSameTaskGroupAndInterval = conflictPosses
                 .stream()
                 .allMatch(
-                    conflictPosse -> conflictPosse.taskLock.getGranularity() 
!= request.getGranularity()
-                                     && 
conflictPosse.getTaskLock().getGroupId().equals(request.getGroupId())
+                    conflictPosse -> 
conflictPosse.getTaskLock().getGroupId().equals(request.getGroupId())
                                      && 
conflictPosse.getTaskLock().getInterval().equals(request.getInterval())
                 );
-            if (allDifferentGranularity) {
+
+            if (allLocksHaveSameTaskGroupAndInterval) {
               // Lock collision was because of the different granularity in 
the same group.
+              // OR because of different lock types for exclusive locks within 
the same group
               // We can add a new taskLockPosse.
               posseToUse = createNewTaskLockPosse(request);
             } else {
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
index 3c95709a9fc..e6b20dd073e 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
@@ -80,6 +80,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -1743,6 +1744,33 @@ public class TaskLockboxTest
     validator.expectRevokedLocks(appendLock0, appendLock2, exclusiveLock, 
replaceLock, sharedLock);
   }
 
+  @Test
+  public void testTimechunkLockTypeTransitionForSameTaskGroup()
+  {
+    Task task = NoopTask.create();
+    Task otherGroupTask = NoopTask.create();
+
+    // Create an exclusive lock
+    validator.expectLockCreated(TaskLockType.EXCLUSIVE, task, 
Intervals.of("2024/2025"));
+
+    // Verify that new locks are created for all other conflicting lock 
requests for the same interval and group
+    validator.expectLockCreated(TaskLockType.SHARED, task, 
Intervals.of("2024/2025"));
+    validator.expectLockCreated(TaskLockType.REPLACE, task, 
Intervals.of("2024/2025"));
+    validator.expectLockCreated(TaskLockType.APPEND, task, 
Intervals.of("2024/2025"));
+
+    // Conflicting locks for a different interval cannot be granted
+    validator.expectLockNotGranted(TaskLockType.EXCLUSIVE, task, 
Intervals.of("2023/2025"));
+    validator.expectLockNotGranted(TaskLockType.SHARED, task, 
Intervals.of("2023/2025"));
+    validator.expectLockNotGranted(TaskLockType.REPLACE, task, 
Intervals.of("2023/2025"));
+    validator.expectLockNotGranted(TaskLockType.APPEND, task, 
Intervals.of("2023/2025"));
+
+    // Locks must not be granted when the task group is different
+    validator.expectLockNotGranted(TaskLockType.EXCLUSIVE, otherGroupTask, 
Intervals.of("2024/2025"));
+    validator.expectLockNotGranted(TaskLockType.SHARED, otherGroupTask, 
Intervals.of("2024/2025"));
+    validator.expectLockNotGranted(TaskLockType.REPLACE, otherGroupTask, 
Intervals.of("2024/2025"));
+    validator.expectLockNotGranted(TaskLockType.APPEND, otherGroupTask, 
Intervals.of("2024/2025"));
+  }
+
   @Test
   public void testGetLockedIntervalsForRevokedLocks()
   {
@@ -1977,7 +2005,7 @@ public class TaskLockboxTest
   private class TaskLockboxValidator
   {
 
-    private final List<Task> tasks;
+    private final Set<Task> tasks;
     private final TaskLockbox lockbox;
     private final TaskStorage taskStorage;
     private final Map<TaskLock, String> lockToTaskIdMap;
@@ -1985,11 +2013,19 @@ public class TaskLockboxTest
     TaskLockboxValidator(TaskLockbox lockbox, TaskStorage taskStorage)
     {
       lockToTaskIdMap = new HashMap<>();
-      tasks = new ArrayList<>();
+      tasks = new HashSet<>();
       this.lockbox = lockbox;
       this.taskStorage = taskStorage;
     }
 
+    public TaskLock expectLockCreated(TaskLockType type, Task task, Interval 
interval)
+    {
+      final TaskLock lock = tryTaskLock(type, task, interval);
+      Assert.assertNotNull(lock);
+      Assert.assertFalse(lock.isRevoked());
+      return lock;
+    }
+
     public TaskLock expectLockCreated(TaskLockType type, Interval interval, 
int priority)
     {
       final TaskLock lock = tryTaskLock(type, interval, priority);
@@ -2003,6 +2039,12 @@ public class TaskLockboxTest
       lockbox.revokeLock(lockToTaskIdMap.get(lock), lock);
     }
 
+    public void expectLockNotGranted(TaskLockType type, Task task, Interval 
interval)
+    {
+      final TaskLock lock = tryTaskLock(type, task, interval);
+      Assert.assertNull(lock);
+    }
+
     public void expectLockNotGranted(TaskLockType type, Interval interval, int 
priority)
     {
       final TaskLock lock = tryTaskLock(type, interval, priority);
@@ -2031,12 +2073,12 @@ public class TaskLockboxTest
       }
     }
 
-    private TaskLock tryTaskLock(TaskLockType type, Interval interval, int 
priority)
+    private TaskLock tryTaskLock(TaskLockType type, Task task, Interval 
interval)
     {
-      final Task task = NoopTask.ofPriority(priority);
-      tasks.add(task);
-      lockbox.add(task);
-      taskStorage.insert(task, TaskStatus.running(task.getId()));
+      if (tasks.add(task)) {
+        lockbox.add(task);
+        taskStorage.insert(task, TaskStatus.running(task.getId()));
+      }
       TaskLock lock = tryTimeChunkLock(type, task, interval).getTaskLock();
       if (lock != null) {
         lockToTaskIdMap.put(lock, task.getId());
@@ -2044,6 +2086,11 @@ public class TaskLockboxTest
       return lock;
     }
 
+    private TaskLock tryTaskLock(TaskLockType type, Interval interval, int 
priority)
+    {
+      return tryTaskLock(type, NoopTask.ofPriority(priority), interval);
+    }
+
     private Set<TaskLock> getAllActiveLocks()
     {
       return tasks.stream()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to