This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch 29.0.1
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/29.0.1 by this push:
     new 33478f08265 [Backport] Concurrent replace should work with supervisors 
using concurrent locks (#15995) (#16019)
33478f08265 is described below

commit 33478f0826537fd618dc3fd8776b1173f9a3ecdf
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Fri Mar 1 12:03:07 2024 +0530

    [Backport] Concurrent replace should work with supervisors using concurrent 
locks (#15995) (#16019)
    
    * Concurrent replace should work with supervisors using concurrent locks
    
    * Ignore supervisors with useConcurrentLocks set to false
    
    * Apply feedback
---
 .../overlord/supervisor/SupervisorManager.java     | 27 ++++++++++----
 .../overlord/supervisor/SupervisorManagerTest.java | 42 ++++++++++++++++++++++
 2 files changed, 63 insertions(+), 6 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
index 207ff56f28f..810a991c2f2 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
@@ -87,23 +87,38 @@ public class SupervisorManager
       final Supervisor supervisor = entry.getValue().lhs;
       final SupervisorSpec supervisorSpec = entry.getValue().rhs;
 
-      TaskLockType taskLockType = null;
+      boolean hasAppendLock = Tasks.DEFAULT_USE_CONCURRENT_LOCKS;
       if (supervisorSpec instanceof SeekableStreamSupervisorSpec) {
         SeekableStreamSupervisorSpec seekableStreamSupervisorSpec = 
(SeekableStreamSupervisorSpec) supervisorSpec;
         Map<String, Object> context = 
seekableStreamSupervisorSpec.getContext();
         if (context != null) {
-          taskLockType = QueryContexts.getAsEnum(
-              Tasks.TASK_LOCK_TYPE,
-              context.get(Tasks.TASK_LOCK_TYPE),
-              TaskLockType.class
+          Boolean useConcurrentLocks = QueryContexts.getAsBoolean(
+              Tasks.USE_CONCURRENT_LOCKS,
+              context.get(Tasks.USE_CONCURRENT_LOCKS)
           );
+          if (useConcurrentLocks == null) {
+            TaskLockType taskLockType = QueryContexts.getAsEnum(
+                Tasks.TASK_LOCK_TYPE,
+                context.get(Tasks.TASK_LOCK_TYPE),
+                TaskLockType.class
+            );
+            if (taskLockType == null) {
+              hasAppendLock = Tasks.DEFAULT_USE_CONCURRENT_LOCKS;
+            } else if (taskLockType == TaskLockType.APPEND) {
+              hasAppendLock = true;
+            } else {
+              hasAppendLock = false;
+            }
+          } else {
+            hasAppendLock = useConcurrentLocks;
+          }
         }
       }
 
       if (supervisor instanceof SeekableStreamSupervisor
           && !supervisorSpec.isSuspended()
           && supervisorSpec.getDataSources().contains(datasource)
-          && TaskLockType.APPEND.equals(taskLockType)) {
+          && (hasAppendLock)) {
         return Optional.of(supervisorId);
       }
     }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
index e8c5d839cf1..5ffbd4b9460 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
@@ -468,6 +468,21 @@ public class SupervisorManagerTest extends EasyMockSupport
     EasyMock.replay(activeSpec);
     metadataSupervisorManager.insert(EasyMock.anyString(), 
EasyMock.anyObject());
 
+    SeekableStreamSupervisorSpec activeSpecWithConcurrentLocks = 
EasyMock.mock(SeekableStreamSupervisorSpec.class);
+    Supervisor activeSupervisorWithConcurrentLocks = 
EasyMock.mock(SeekableStreamSupervisor.class);
+    
EasyMock.expect(activeSpecWithConcurrentLocks.getId()).andReturn("activeSpecWithConcurrentLocks").anyTimes();
+    
EasyMock.expect(activeSpecWithConcurrentLocks.isSuspended()).andReturn(false).anyTimes();
+    EasyMock.expect(activeSpecWithConcurrentLocks.getDataSources())
+            .andReturn(ImmutableList.of("activeConcurrentLocksDS")).anyTimes();
+    EasyMock.expect(activeSpecWithConcurrentLocks.createSupervisor())
+            .andReturn(activeSupervisorWithConcurrentLocks).anyTimes();
+    
EasyMock.expect(activeSpecWithConcurrentLocks.createAutoscaler(activeSupervisorWithConcurrentLocks))
+            .andReturn(null).anyTimes();
+    EasyMock.expect(activeSpecWithConcurrentLocks.getContext())
+            .andReturn(ImmutableMap.of(Tasks.USE_CONCURRENT_LOCKS, 
true)).anyTimes();
+    EasyMock.replay(activeSpecWithConcurrentLocks);
+    metadataSupervisorManager.insert(EasyMock.anyString(), 
EasyMock.anyObject());
+
     SeekableStreamSupervisorSpec activeAppendSpec = 
EasyMock.mock(SeekableStreamSupervisorSpec.class);
     Supervisor activeAppendSupervisor = 
EasyMock.mock(SeekableStreamSupervisor.class);
     
EasyMock.expect(activeAppendSpec.getId()).andReturn("activeAppendSpec").anyTimes();
@@ -482,6 +497,25 @@ public class SupervisorManagerTest extends EasyMockSupport
     EasyMock.replay(activeAppendSpec);
     metadataSupervisorManager.insert(EasyMock.anyString(), 
EasyMock.anyObject());
 
+    // A supervisor with useConcurrentLocks set to false explicitly must not 
use an append lock
+    SeekableStreamSupervisorSpec specWithUseConcurrentLocksFalse = 
EasyMock.mock(SeekableStreamSupervisorSpec.class);
+    Supervisor supervisorWithUseConcurrentLocksFalse = 
EasyMock.mock(SeekableStreamSupervisor.class);
+    
EasyMock.expect(specWithUseConcurrentLocksFalse.getId()).andReturn("useConcurrentLocksFalse").anyTimes();
+    
EasyMock.expect(specWithUseConcurrentLocksFalse.isSuspended()).andReturn(false).anyTimes();
+    EasyMock.expect(specWithUseConcurrentLocksFalse.getDataSources())
+            
.andReturn(ImmutableList.of("dsWithuseConcurrentLocksFalse")).anyTimes();
+    
EasyMock.expect(specWithUseConcurrentLocksFalse.createSupervisor()).andReturn(supervisorWithUseConcurrentLocksFalse).anyTimes();
+    
EasyMock.expect(specWithUseConcurrentLocksFalse.createAutoscaler(supervisorWithUseConcurrentLocksFalse))
+            .andReturn(null).anyTimes();
+    
EasyMock.expect(specWithUseConcurrentLocksFalse.getContext()).andReturn(ImmutableMap.of(
+        Tasks.USE_CONCURRENT_LOCKS,
+        false,
+        Tasks.TASK_LOCK_TYPE,
+        TaskLockType.APPEND.name()
+    )).anyTimes();
+    EasyMock.replay(specWithUseConcurrentLocksFalse);
+    metadataSupervisorManager.insert(EasyMock.anyString(), 
EasyMock.anyObject());
+
     replayAll();
     manager.start();
 
@@ -499,6 +533,14 @@ public class SupervisorManagerTest extends EasyMockSupport
     manager.createOrUpdateAndStartSupervisor(activeAppendSpec);
     
Assert.assertTrue(manager.getActiveSupervisorIdForDatasourceWithAppendLock("activeAppendDS").isPresent());
 
+    manager.createOrUpdateAndStartSupervisor(activeSpecWithConcurrentLocks);
+    
Assert.assertTrue(manager.getActiveSupervisorIdForDatasourceWithAppendLock("activeConcurrentLocksDS").isPresent());
+
+    manager.createOrUpdateAndStartSupervisor(specWithUseConcurrentLocksFalse);
+    Assert.assertFalse(
+        
manager.getActiveSupervisorIdForDatasourceWithAppendLock("dsWithUseConcurrentLocksFalse").isPresent()
+    );
+
     verifyAll();
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to