This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 33fdd770f70 Consider only supervisors with append lock for concurrent 
transactional replace (#15220)
33fdd770f70 is described below

commit 33fdd770f7048f4209f01441086847e336d7fe94
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Sun Oct 22 14:12:36 2023 +0530

    Consider only supervisors with append lock for concurrent transactional 
replace (#15220)
    
    A SegmentTransactionReplaceAction must only update the mapping of tasks 
with append locks that are running concurrently. To ensure this, we return the 
supervisor id only if it has the taskLockType as APPEND in its context.
---
 .../actions/SegmentTransactionalReplaceAction.java | 11 +++-
 .../overlord/supervisor/SupervisorManager.java     | 27 ++++++++-
 .../overlord/supervisor/SupervisorManagerTest.java | 67 ++++++++++++++++++++++
 3 files changed, 100 insertions(+), 5 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java
index 5a2b3ceec8f..e6ad0426e46 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java
@@ -139,8 +139,9 @@ public class SegmentTransactionalReplaceAction implements 
TaskAction<SegmentPubl
   private void tryUpgradeOverlappingPendingSegments(Task task, 
TaskActionToolbox toolbox)
   {
     final SupervisorManager supervisorManager = toolbox.getSupervisorManager();
-    final Optional<String> activeSupervisorId = 
supervisorManager.getActiveSupervisorIdForDatasource(task.getDataSource());
-    if (!activeSupervisorId.isPresent()) {
+    final Optional<String> activeSupervisorIdWithAppendLock =
+        
supervisorManager.getActiveSupervisorIdForDatasourceWithAppendLock(task.getDataSource());
+    if (!activeSupervisorIdWithAppendLock.isPresent()) {
       return;
     }
 
@@ -153,7 +154,11 @@ public class SegmentTransactionalReplaceAction implements 
TaskAction<SegmentPubl
 
     upgradedPendingSegments.forEach(
         (oldId, newId) -> toolbox.getSupervisorManager()
-                                 
.registerNewVersionOfPendingSegmentOnSupervisor(activeSupervisorId.get(), 
oldId, newId)
+                                 
.registerNewVersionOfPendingSegmentOnSupervisor(
+                                     activeSupervisorIdWithAppendLock.get(),
+                                     oldId,
+                                     newId
+                                 )
     );
   }
 
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
index d55f3cc8bd0..df454c1011a 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
@@ -22,14 +22,18 @@ package org.apache.druid.indexing.overlord.supervisor;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
+import org.apache.druid.indexing.common.TaskLockType;
+import org.apache.druid.indexing.common.task.Tasks;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
 import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
+import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.metadata.MetadataSupervisorManager;
+import org.apache.druid.query.QueryContexts;
 import org.apache.druid.segment.incremental.ParseExceptionReport;
 import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
 
@@ -71,15 +75,34 @@ public class SupervisorManager
     return supervisors.keySet();
   }
 
-  public Optional<String> getActiveSupervisorIdForDatasource(String datasource)
+  /**
+   * @param datasource Datasource to find active supervisor id with append 
lock for.
+   * @return An optional with the active appending supervisor id if it exists.
+   */
+  public Optional<String> 
getActiveSupervisorIdForDatasourceWithAppendLock(String datasource)
   {
     for (Map.Entry<String, Pair<Supervisor, SupervisorSpec>> entry : 
supervisors.entrySet()) {
       final String supervisorId = entry.getKey();
       final Supervisor supervisor = entry.getValue().lhs;
       final SupervisorSpec supervisorSpec = entry.getValue().rhs;
+
+      TaskLockType taskLockType = null;
+      if (supervisorSpec instanceof SeekableStreamSupervisorSpec) {
+        SeekableStreamSupervisorSpec seekableStreamSupervisorSpec = 
(SeekableStreamSupervisorSpec) supervisorSpec;
+        Map<String, Object> context = 
seekableStreamSupervisorSpec.getContext();
+        if (context != null) {
+          taskLockType = QueryContexts.getAsEnum(
+              Tasks.TASK_LOCK_TYPE,
+              context.get(Tasks.TASK_LOCK_TYPE),
+              TaskLockType.class
+          );
+        }
+      }
+
       if (supervisor instanceof SeekableStreamSupervisor
           && !supervisorSpec.isSuspended()
-          && supervisorSpec.getDataSources().contains(datasource)) {
+          && supervisorSpec.getDataSources().contains(datasource)
+          && TaskLockType.APPEND.equals(taskLockType)) {
         return Optional.of(supervisorId);
       }
     }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
index ba4b963e9b3..e8c5d839cf1 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
@@ -23,9 +23,13 @@ import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import org.apache.druid.indexing.common.TaskLockType;
+import org.apache.druid.indexing.common.task.Tasks;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
 import 
org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
 import 
org.apache.druid.indexing.seekablestream.TestSeekableStreamDataSourceMetadata;
+import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
+import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.metadata.MetadataSupervisorManager;
 import org.easymock.Capture;
@@ -434,6 +438,69 @@ public class SupervisorManagerTest extends EasyMockSupport
     Assert.assertTrue(manager.getSupervisorIds().isEmpty());
   }
 
+  @Test
+  public void testGetActiveSupervisorIdForDatasourceWithAppendLock()
+  {
+    
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(Collections.emptyMap());
+
+    NoopSupervisorSpec noopSupervisorSpec = new NoopSupervisorSpec("noop", 
ImmutableList.of("noopDS"));
+    metadataSupervisorManager.insert(EasyMock.anyString(), 
EasyMock.anyObject());
+
+    SeekableStreamSupervisorSpec suspendedSpec = 
EasyMock.mock(SeekableStreamSupervisorSpec.class);
+    Supervisor suspendedSupervisor = 
EasyMock.mock(SeekableStreamSupervisor.class);
+    
EasyMock.expect(suspendedSpec.getId()).andReturn("suspendedSpec").anyTimes();
+    EasyMock.expect(suspendedSpec.isSuspended()).andReturn(true).anyTimes();
+    
EasyMock.expect(suspendedSpec.getDataSources()).andReturn(ImmutableList.of("suspendedDS")).anyTimes();
+    
EasyMock.expect(suspendedSpec.createSupervisor()).andReturn(suspendedSupervisor).anyTimes();
+    
EasyMock.expect(suspendedSpec.createAutoscaler(suspendedSupervisor)).andReturn(null).anyTimes();
+    EasyMock.expect(suspendedSpec.getContext()).andReturn(null).anyTimes();
+    EasyMock.replay(suspendedSpec);
+    metadataSupervisorManager.insert(EasyMock.anyString(), 
EasyMock.anyObject());
+
+    SeekableStreamSupervisorSpec activeSpec = 
EasyMock.mock(SeekableStreamSupervisorSpec.class);
+    Supervisor activeSupervisor = 
EasyMock.mock(SeekableStreamSupervisor.class);
+    EasyMock.expect(activeSpec.getId()).andReturn("activeSpec").anyTimes();
+    EasyMock.expect(activeSpec.isSuspended()).andReturn(false).anyTimes();
+    
EasyMock.expect(activeSpec.getDataSources()).andReturn(ImmutableList.of("activeDS")).anyTimes();
+    
EasyMock.expect(activeSpec.createSupervisor()).andReturn(activeSupervisor).anyTimes();
+    
EasyMock.expect(activeSpec.createAutoscaler(activeSupervisor)).andReturn(null).anyTimes();
+    EasyMock.expect(activeSpec.getContext()).andReturn(null).anyTimes();
+    EasyMock.replay(activeSpec);
+    metadataSupervisorManager.insert(EasyMock.anyString(), 
EasyMock.anyObject());
+
+    SeekableStreamSupervisorSpec activeAppendSpec = 
EasyMock.mock(SeekableStreamSupervisorSpec.class);
+    Supervisor activeAppendSupervisor = 
EasyMock.mock(SeekableStreamSupervisor.class);
+    
EasyMock.expect(activeAppendSpec.getId()).andReturn("activeAppendSpec").anyTimes();
+    
EasyMock.expect(activeAppendSpec.isSuspended()).andReturn(false).anyTimes();
+    
EasyMock.expect(activeAppendSpec.getDataSources()).andReturn(ImmutableList.of("activeAppendDS")).anyTimes();
+    
EasyMock.expect(activeAppendSpec.createSupervisor()).andReturn(activeAppendSupervisor).anyTimes();
+    
EasyMock.expect(activeAppendSpec.createAutoscaler(activeAppendSupervisor)).andReturn(null).anyTimes();
+    EasyMock.expect(activeAppendSpec.getContext()).andReturn(ImmutableMap.of(
+        Tasks.TASK_LOCK_TYPE,
+        TaskLockType.APPEND.name()
+    )).anyTimes();
+    EasyMock.replay(activeAppendSpec);
+    metadataSupervisorManager.insert(EasyMock.anyString(), 
EasyMock.anyObject());
+
+    replayAll();
+    manager.start();
+
+    
Assert.assertFalse(manager.getActiveSupervisorIdForDatasourceWithAppendLock("nonExistent").isPresent());
+
+    manager.createOrUpdateAndStartSupervisor(noopSupervisorSpec);
+    
Assert.assertFalse(manager.getActiveSupervisorIdForDatasourceWithAppendLock("noopDS").isPresent());
+
+    manager.createOrUpdateAndStartSupervisor(suspendedSpec);
+    
Assert.assertFalse(manager.getActiveSupervisorIdForDatasourceWithAppendLock("suspendedDS").isPresent());
+
+    manager.createOrUpdateAndStartSupervisor(activeSpec);
+    
Assert.assertFalse(manager.getActiveSupervisorIdForDatasourceWithAppendLock("activeDS").isPresent());
+
+    manager.createOrUpdateAndStartSupervisor(activeAppendSpec);
+    
Assert.assertTrue(manager.getActiveSupervisorIdForDatasourceWithAppendLock("activeAppendDS").isPresent());
+
+    verifyAll();
+  }
 
   private static class TestSupervisorSpec implements SupervisorSpec
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to