This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch 29.0.1
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/29.0.1 by this push:
new 33478f08265 [Backport] Concurrent replace should work with supervisors
using concurrent locks (#15995) (#16019)
33478f08265 is described below
commit 33478f0826537fd618dc3fd8776b1173f9a3ecdf
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Fri Mar 1 12:03:07 2024 +0530
[Backport] Concurrent replace should work with supervisors using concurrent
locks (#15995) (#16019)
* Concurrent replace should work with supervisors using concurrent locks
* Ignore supervisors with useConcurrentLocks set to false
* Apply feedback
---
.../overlord/supervisor/SupervisorManager.java | 27 ++++++++++----
.../overlord/supervisor/SupervisorManagerTest.java | 42 ++++++++++++++++++++++
2 files changed, 63 insertions(+), 6 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
index 207ff56f28f..810a991c2f2 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
@@ -87,23 +87,38 @@ public class SupervisorManager
final Supervisor supervisor = entry.getValue().lhs;
final SupervisorSpec supervisorSpec = entry.getValue().rhs;
- TaskLockType taskLockType = null;
+ boolean hasAppendLock = Tasks.DEFAULT_USE_CONCURRENT_LOCKS;
if (supervisorSpec instanceof SeekableStreamSupervisorSpec) {
SeekableStreamSupervisorSpec seekableStreamSupervisorSpec =
(SeekableStreamSupervisorSpec) supervisorSpec;
Map<String, Object> context =
seekableStreamSupervisorSpec.getContext();
if (context != null) {
- taskLockType = QueryContexts.getAsEnum(
- Tasks.TASK_LOCK_TYPE,
- context.get(Tasks.TASK_LOCK_TYPE),
- TaskLockType.class
+ Boolean useConcurrentLocks = QueryContexts.getAsBoolean(
+ Tasks.USE_CONCURRENT_LOCKS,
+ context.get(Tasks.USE_CONCURRENT_LOCKS)
);
+ if (useConcurrentLocks == null) {
+ TaskLockType taskLockType = QueryContexts.getAsEnum(
+ Tasks.TASK_LOCK_TYPE,
+ context.get(Tasks.TASK_LOCK_TYPE),
+ TaskLockType.class
+ );
+ if (taskLockType == null) {
+ hasAppendLock = Tasks.DEFAULT_USE_CONCURRENT_LOCKS;
+ } else if (taskLockType == TaskLockType.APPEND) {
+ hasAppendLock = true;
+ } else {
+ hasAppendLock = false;
+ }
+ } else {
+ hasAppendLock = useConcurrentLocks;
+ }
}
}
if (supervisor instanceof SeekableStreamSupervisor
&& !supervisorSpec.isSuspended()
&& supervisorSpec.getDataSources().contains(datasource)
- && TaskLockType.APPEND.equals(taskLockType)) {
+ && (hasAppendLock)) {
return Optional.of(supervisorId);
}
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
index e8c5d839cf1..5ffbd4b9460 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
@@ -468,6 +468,21 @@ public class SupervisorManagerTest extends EasyMockSupport
EasyMock.replay(activeSpec);
metadataSupervisorManager.insert(EasyMock.anyString(),
EasyMock.anyObject());
+ SeekableStreamSupervisorSpec activeSpecWithConcurrentLocks =
EasyMock.mock(SeekableStreamSupervisorSpec.class);
+ Supervisor activeSupervisorWithConcurrentLocks =
EasyMock.mock(SeekableStreamSupervisor.class);
+
EasyMock.expect(activeSpecWithConcurrentLocks.getId()).andReturn("activeSpecWithConcurrentLocks").anyTimes();
+
EasyMock.expect(activeSpecWithConcurrentLocks.isSuspended()).andReturn(false).anyTimes();
+ EasyMock.expect(activeSpecWithConcurrentLocks.getDataSources())
+ .andReturn(ImmutableList.of("activeConcurrentLocksDS")).anyTimes();
+ EasyMock.expect(activeSpecWithConcurrentLocks.createSupervisor())
+ .andReturn(activeSupervisorWithConcurrentLocks).anyTimes();
+
EasyMock.expect(activeSpecWithConcurrentLocks.createAutoscaler(activeSupervisorWithConcurrentLocks))
+ .andReturn(null).anyTimes();
+ EasyMock.expect(activeSpecWithConcurrentLocks.getContext())
+ .andReturn(ImmutableMap.of(Tasks.USE_CONCURRENT_LOCKS,
true)).anyTimes();
+ EasyMock.replay(activeSpecWithConcurrentLocks);
+ metadataSupervisorManager.insert(EasyMock.anyString(),
EasyMock.anyObject());
+
SeekableStreamSupervisorSpec activeAppendSpec =
EasyMock.mock(SeekableStreamSupervisorSpec.class);
Supervisor activeAppendSupervisor =
EasyMock.mock(SeekableStreamSupervisor.class);
EasyMock.expect(activeAppendSpec.getId()).andReturn("activeAppendSpec").anyTimes();
@@ -482,6 +497,25 @@ public class SupervisorManagerTest extends EasyMockSupport
EasyMock.replay(activeAppendSpec);
metadataSupervisorManager.insert(EasyMock.anyString(),
EasyMock.anyObject());
+ // A supervisor with useConcurrentLocks set to false explicitly must not
use an append lock
+ SeekableStreamSupervisorSpec specWithUseConcurrentLocksFalse =
EasyMock.mock(SeekableStreamSupervisorSpec.class);
+ Supervisor supervisorWithUseConcurrentLocksFalse =
EasyMock.mock(SeekableStreamSupervisor.class);
+
EasyMock.expect(specWithUseConcurrentLocksFalse.getId()).andReturn("useConcurrentLocksFalse").anyTimes();
+
EasyMock.expect(specWithUseConcurrentLocksFalse.isSuspended()).andReturn(false).anyTimes();
+ EasyMock.expect(specWithUseConcurrentLocksFalse.getDataSources())
+
.andReturn(ImmutableList.of("dsWithuseConcurrentLocksFalse")).anyTimes();
+
EasyMock.expect(specWithUseConcurrentLocksFalse.createSupervisor()).andReturn(supervisorWithUseConcurrentLocksFalse).anyTimes();
+
EasyMock.expect(specWithUseConcurrentLocksFalse.createAutoscaler(supervisorWithUseConcurrentLocksFalse))
+ .andReturn(null).anyTimes();
+
EasyMock.expect(specWithUseConcurrentLocksFalse.getContext()).andReturn(ImmutableMap.of(
+ Tasks.USE_CONCURRENT_LOCKS,
+ false,
+ Tasks.TASK_LOCK_TYPE,
+ TaskLockType.APPEND.name()
+ )).anyTimes();
+ EasyMock.replay(specWithUseConcurrentLocksFalse);
+ metadataSupervisorManager.insert(EasyMock.anyString(),
EasyMock.anyObject());
+
replayAll();
manager.start();
@@ -499,6 +533,14 @@ public class SupervisorManagerTest extends EasyMockSupport
manager.createOrUpdateAndStartSupervisor(activeAppendSpec);
Assert.assertTrue(manager.getActiveSupervisorIdForDatasourceWithAppendLock("activeAppendDS").isPresent());
+ manager.createOrUpdateAndStartSupervisor(activeSpecWithConcurrentLocks);
+
Assert.assertTrue(manager.getActiveSupervisorIdForDatasourceWithAppendLock("activeConcurrentLocksDS").isPresent());
+
+ manager.createOrUpdateAndStartSupervisor(specWithUseConcurrentLocksFalse);
+ Assert.assertFalse(
+
manager.getActiveSupervisorIdForDatasourceWithAppendLock("dsWithUseConcurrentLocksFalse").isPresent()
+ );
+
verifyAll();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]