This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch 30.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/30.0.0 by this push:
new 108709a907b Allow different timechunk lock types to coexist in a task
group (#16369) (#16373)
108709a907b is described below
commit 108709a907bc33c5c22728e413038d0743bc2acc
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Thu May 2 23:23:25 2024 +0530
Allow different timechunk lock types to coexist in a task group (#16369)
(#16373)
Description:
All the streaming ingestion tasks for a given datasource share the same
lock for a given interval.
Changing lock types in the supervisor can lead to segment allocation errors
due to lock conflicts
for the new tasks while the older tasks are still running.
Fix:
Allow locks of different types (EXCLUSIVE, SHARED, APPEND, REPLACE) to
co-exist if they have
the same interval and the same task group.
---
.../druid/indexing/overlord/TaskLockbox.java | 16 +++---
.../druid/indexing/overlord/TaskLockboxTest.java | 61 +++++++++++++++++++---
2 files changed, 63 insertions(+), 14 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
index 1a1369ba1e5..1155592f325 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
@@ -579,18 +579,20 @@ public class TaskLockbox
|| revokeAllIncompatibleActiveLocksIfPossible(conflictPosses,
request)) {
posseToUse = createNewTaskLockPosse(request);
} else {
- // During a rolling update, tasks of mixed versions can be run at
the same time. Old tasks would request
- // timeChunkLocks while new tasks would ask segmentLocks. The
below check is to allow for old and new tasks
- // to get locks of different granularities if they have the same
groupId.
- final boolean allDifferentGranularity = conflictPosses
+ // When a rolling upgrade happens or lock types are changed for an
ongoing Streaming ingestion supervisor,
+ // the existing tasks might have or request different lock
granularities or types than the new ones.
+ // To ensure a smooth transition, we must allocate the different
lock types for the new tasks
+ // so that they can coexist and ingest with the required locks.
+ final boolean allLocksHaveSameTaskGroupAndInterval = conflictPosses
.stream()
.allMatch(
- conflictPosse -> conflictPosse.taskLock.getGranularity()
!= request.getGranularity()
- &&
conflictPosse.getTaskLock().getGroupId().equals(request.getGroupId())
+ conflictPosse ->
conflictPosse.getTaskLock().getGroupId().equals(request.getGroupId())
&&
conflictPosse.getTaskLock().getInterval().equals(request.getInterval())
);
- if (allDifferentGranularity) {
+
+ if (allLocksHaveSameTaskGroupAndInterval) {
// Lock collision was because of the different granularity in
the same group.
+ // OR because of different lock types for exclusive locks within
the same group
// We can add a new taskLockPosse.
posseToUse = createNewTaskLockPosse(request);
} else {
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
index 3c95709a9fc..e6b20dd073e 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
@@ -80,6 +80,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -1743,6 +1744,33 @@ public class TaskLockboxTest
validator.expectRevokedLocks(appendLock0, appendLock2, exclusiveLock,
replaceLock, sharedLock);
}
+ @Test
+ public void testTimechunkLockTypeTransitionForSameTaskGroup()
+ {
+ Task task = NoopTask.create();
+ Task otherGroupTask = NoopTask.create();
+
+ // Create an exclusive lock
+ validator.expectLockCreated(TaskLockType.EXCLUSIVE, task,
Intervals.of("2024/2025"));
+
+ // Verify that new locks are created for all other conflicting lock
requests for the same interval and group
+ validator.expectLockCreated(TaskLockType.SHARED, task,
Intervals.of("2024/2025"));
+ validator.expectLockCreated(TaskLockType.REPLACE, task,
Intervals.of("2024/2025"));
+ validator.expectLockCreated(TaskLockType.APPEND, task,
Intervals.of("2024/2025"));
+
+ // Conflicting locks for a different interval cannot be granted
+ validator.expectLockNotGranted(TaskLockType.EXCLUSIVE, task,
Intervals.of("2023/2025"));
+ validator.expectLockNotGranted(TaskLockType.SHARED, task,
Intervals.of("2023/2025"));
+ validator.expectLockNotGranted(TaskLockType.REPLACE, task,
Intervals.of("2023/2025"));
+ validator.expectLockNotGranted(TaskLockType.APPEND, task,
Intervals.of("2023/2025"));
+
+ // Locks must not be granted when the task group is different
+ validator.expectLockNotGranted(TaskLockType.EXCLUSIVE, otherGroupTask,
Intervals.of("2024/2025"));
+ validator.expectLockNotGranted(TaskLockType.SHARED, otherGroupTask,
Intervals.of("2024/2025"));
+ validator.expectLockNotGranted(TaskLockType.REPLACE, otherGroupTask,
Intervals.of("2024/2025"));
+ validator.expectLockNotGranted(TaskLockType.APPEND, otherGroupTask,
Intervals.of("2024/2025"));
+ }
+
@Test
public void testGetLockedIntervalsForRevokedLocks()
{
@@ -1977,7 +2005,7 @@ public class TaskLockboxTest
private class TaskLockboxValidator
{
- private final List<Task> tasks;
+ private final Set<Task> tasks;
private final TaskLockbox lockbox;
private final TaskStorage taskStorage;
private final Map<TaskLock, String> lockToTaskIdMap;
@@ -1985,11 +2013,19 @@ public class TaskLockboxTest
TaskLockboxValidator(TaskLockbox lockbox, TaskStorage taskStorage)
{
lockToTaskIdMap = new HashMap<>();
- tasks = new ArrayList<>();
+ tasks = new HashSet<>();
this.lockbox = lockbox;
this.taskStorage = taskStorage;
}
+ public TaskLock expectLockCreated(TaskLockType type, Task task, Interval
interval)
+ {
+ final TaskLock lock = tryTaskLock(type, task, interval);
+ Assert.assertNotNull(lock);
+ Assert.assertFalse(lock.isRevoked());
+ return lock;
+ }
+
public TaskLock expectLockCreated(TaskLockType type, Interval interval,
int priority)
{
final TaskLock lock = tryTaskLock(type, interval, priority);
@@ -2003,6 +2039,12 @@ public class TaskLockboxTest
lockbox.revokeLock(lockToTaskIdMap.get(lock), lock);
}
+ public void expectLockNotGranted(TaskLockType type, Task task, Interval
interval)
+ {
+ final TaskLock lock = tryTaskLock(type, task, interval);
+ Assert.assertNull(lock);
+ }
+
public void expectLockNotGranted(TaskLockType type, Interval interval, int
priority)
{
final TaskLock lock = tryTaskLock(type, interval, priority);
@@ -2031,12 +2073,12 @@ public class TaskLockboxTest
}
}
- private TaskLock tryTaskLock(TaskLockType type, Interval interval, int
priority)
+ private TaskLock tryTaskLock(TaskLockType type, Task task, Interval
interval)
{
- final Task task = NoopTask.ofPriority(priority);
- tasks.add(task);
- lockbox.add(task);
- taskStorage.insert(task, TaskStatus.running(task.getId()));
+ if (tasks.add(task)) {
+ lockbox.add(task);
+ taskStorage.insert(task, TaskStatus.running(task.getId()));
+ }
TaskLock lock = tryTimeChunkLock(type, task, interval).getTaskLock();
if (lock != null) {
lockToTaskIdMap.put(lock, task.getId());
@@ -2044,6 +2086,11 @@ public class TaskLockboxTest
return lock;
}
+ private TaskLock tryTaskLock(TaskLockType type, Interval interval, int
priority)
+ {
+ return tryTaskLock(type, NoopTask.ofPriority(priority), interval);
+ }
+
private Set<TaskLock> getAllActiveLocks()
{
return tasks.stream()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]