This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 24654b1ccc0 Fix concurrent replace single phase tasks  (#18099)
24654b1ccc0 is described below

commit 24654b1ccc0cda31a7470595871e7766a0f8fbeb
Author: Misha <[email protected]>
AuthorDate: Tue Jun 10 13:53:37 2025 +0200

    Fix concurrent replace single phase tasks  (#18099)
    
    Changes:
    
    - Use REPLACE lock in single phase tasks when appendToExisting is false
    - Update tests to verify the change
---
 .../common/task/AbstractBatchIndexTask.java        |  2 +-
 .../parallel/ParallelIndexSupervisorTask.java      | 14 ++++---
 .../SinglePhaseParallelIndexTaskRunner.java        | 46 ++++++----------------
 .../ParallelIndexSupervisorTaskKillTest.java       |  7 +---
 .../ParallelIndexSupervisorTaskResourceTest.java   |  7 +---
 .../parallel/SinglePhaseParallelIndexingTest.java  | 15 ++++---
 6 files changed, 33 insertions(+), 58 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
index 95ca4febaf2..97dc70f3357 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
@@ -535,7 +535,7 @@ public abstract class AbstractBatchIndexTask extends 
AbstractTask
   /**
    * Determines the type of lock to use with the given lock granularity.
    */
-  private TaskLockType determineLockType(LockGranularity lockGranularity)
+  public TaskLockType determineLockType(LockGranularity lockGranularity)
   {
     if (lockGranularity == LockGranularity.SEGMENT) {
       return TaskLockType.EXCLUSIVE;
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index 1d134f81ddb..9faa4c2013e 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -310,6 +310,13 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask
     return ingestionSchema;
   }
 
+  @Nullable
+  @JsonIgnore
+  public String getBaseSubtaskSpecName()
+  {
+    return baseSubtaskSpecName;
+  }
+
   @VisibleForTesting
   @Nullable
   ParallelIndexTaskRunner getCurrentRunner()
@@ -351,12 +358,7 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask
   {
     return new SinglePhaseParallelIndexTaskRunner(
         toolbox,
-        getId(),
-        getGroupId(),
-        baseSubtaskSpecName,
-        ingestionSchema,
-        getContext(),
-        toolbox.getCentralizedTableSchemaConfig()
+        this
     );
   }
 
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java
index a2a29b3cdd3..4ecda626cab 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java
@@ -28,13 +28,11 @@ import org.apache.druid.indexing.common.Counters;
 import org.apache.druid.indexing.common.LockGranularity;
 import org.apache.druid.indexing.common.TaskLockType;
 import org.apache.druid.indexing.common.TaskToolbox;
-import org.apache.druid.indexing.common.actions.TaskLocks;
 import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
 import org.apache.druid.indexing.common.task.Task;
 import 
org.apache.druid.indexing.common.task.batch.parallel.TaskMonitor.SubTaskCompleteEvent;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.NonnullPair;
-import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
 import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
 import org.apache.druid.timeline.SegmentId;
 import org.apache.druid.timeline.partition.BuildingNumberedShardSpec;
@@ -101,42 +99,24 @@ public class SinglePhaseParallelIndexTaskRunner extends 
ParallelIndexPhaseRunner
 
   private final ParallelIndexIngestionSpec ingestionSchema;
   private final SplittableInputSource<?> baseInputSource;
-  private CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig;
+  private final ParallelIndexSupervisorTask supervisorTask;
 
   SinglePhaseParallelIndexTaskRunner(
       TaskToolbox toolbox,
-      String taskId,
-      String groupId,
-      String baseSubtaskSpecName,
-      ParallelIndexIngestionSpec ingestionSchema,
-      Map<String, Object> context,
-      CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
+      ParallelIndexSupervisorTask supervisorTask
   )
   {
     super(
         toolbox,
-        taskId,
-        groupId,
-        baseSubtaskSpecName,
-        ingestionSchema.getTuningConfig(),
-        context
+        supervisorTask.getId(),
+        supervisorTask.getGroupId(),
+        supervisorTask.getBaseSubtaskSpecName(),
+        supervisorTask.getIngestionSchema().getTuningConfig(),
+        supervisorTask.getContext()
     );
-    this.ingestionSchema = ingestionSchema;
+    this.ingestionSchema = supervisorTask.getIngestionSchema();
     this.baseInputSource = (SplittableInputSource) 
ingestionSchema.getIOConfig().getNonNullInputSource(toolbox);
-    this.centralizedDatasourceSchemaConfig = centralizedDatasourceSchemaConfig;
-  }
-
-  @VisibleForTesting
-  SinglePhaseParallelIndexTaskRunner(
-      TaskToolbox toolbox,
-      String taskId,
-      String groupId,
-      ParallelIndexIngestionSpec ingestionSchema,
-      Map<String, Object> context,
-      CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
-  )
-  {
-    this(toolbox, taskId, groupId, taskId, ingestionSchema, context, 
centralizedDatasourceSchemaConfig);
+    this.supervisorTask = supervisorTask;
   }
 
   @Override
@@ -207,7 +187,7 @@ public class SinglePhaseParallelIndexTaskRunner extends 
ParallelIndexPhaseRunner
   @Deprecated
   public SegmentIdWithShardSpec allocateNewSegment(String dataSource, DateTime 
timestamp) throws IOException
   {
-    NonnullPair<Interval, String> intervalAndVersion = 
findIntervalAndVersion(timestamp);
+    NonnullPair<Interval, String> intervalAndVersion = 
findIntervalAndVersion(timestamp, 
supervisorTask.getTaskLockHelper().getLockGranularityToUse());
 
     final int partitionNum = 
Counters.getAndIncrementInt(partitionNumCountersPerInterval, 
intervalAndVersion.lhs);
     return new SegmentIdWithShardSpec(
@@ -237,7 +217,7 @@ public class SinglePhaseParallelIndexTaskRunner extends 
ParallelIndexPhaseRunner
       @Nullable String prevSegmentId
   ) throws IOException
   {
-    NonnullPair<Interval, String> intervalAndVersion = 
findIntervalAndVersion(timestamp);
+    NonnullPair<Interval, String> intervalAndVersion = 
findIntervalAndVersion(timestamp, LockGranularity.TIME_CHUNK);
 
     MutableObject<SegmentIdWithShardSpec> segmentIdHolder = new 
MutableObject<>();
     sequenceToSegmentIds.compute(sequenceName, (k, v) -> {
@@ -285,9 +265,9 @@ public class SinglePhaseParallelIndexTaskRunner extends 
ParallelIndexPhaseRunner
     return segmentIdHolder.getValue();
   }
 
-  NonnullPair<Interval, String> findIntervalAndVersion(DateTime timestamp) 
throws IOException
+  NonnullPair<Interval, String> findIntervalAndVersion(DateTime timestamp, 
LockGranularity granularity) throws IOException
   {
-    TaskLockType taskLockType = 
TaskLocks.determineLockTypeForAppend(getContext());
+    TaskLockType taskLockType = supervisorTask.determineLockType(granularity);
     return AbstractBatchIndexTask.findIntervalAndVersion(getToolbox(), 
ingestionSchema, timestamp, taskLockType);
   }
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java
index 8f5d5450dcf..6a482195abd 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java
@@ -39,7 +39,6 @@ import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
 import org.apache.druid.segment.indexing.DataSchema;
-import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
 import org.joda.time.Interval;
 import org.junit.After;
 import org.junit.Assert;
@@ -256,11 +255,7 @@ public class ParallelIndexSupervisorTaskKillTest extends 
AbstractParallelIndexSu
     {
       super(
           toolbox,
-          supervisorTask.getId(),
-          supervisorTask.getGroupId(),
-          supervisorTask.getIngestionSchema(),
-          supervisorTask.getContext(),
-          CentralizedDatasourceSchemaConfig.create()
+          supervisorTask
       );
       this.supervisorTask = supervisorTask;
     }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
index e8fa4ad1307..b3e47776407 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
@@ -48,7 +48,6 @@ import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
 import org.apache.druid.segment.indexing.DataSchema;
-import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
 import org.apache.druid.segment.realtime.appenderator.SegmentAllocator;
 import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
 import org.apache.druid.server.security.AuthConfig;
@@ -495,11 +494,7 @@ public class ParallelIndexSupervisorTaskResourceTest 
extends AbstractParallelInd
     {
       super(
           toolbox,
-          supervisorTask.getId(),
-          supervisorTask.getGroupId(),
-          supervisorTask.getIngestionSchema(),
-          supervisorTask.getContext(),
-          CentralizedDatasourceSchemaConfig.enabled(true)
+          supervisorTask
       );
       this.supervisorTask = supervisorTask;
     }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
index fe573bf0f63..cc3cdffa747 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
@@ -93,13 +93,13 @@ public class SinglePhaseParallelIndexingTest extends 
AbstractParallelIndexSuperv
   @Rule
   public ExpectedException expectedException = ExpectedException.none();
 
-  @Parameterized.Parameters(name = "{0}, useInputFormatApi={1}, 
useSegmentCache={2}")
+  @Parameterized.Parameters(name = "{0}, useInputFormatApi={1}, 
useSegmentCache={2}, useConcurrentLocks={3}")
   public static Iterable<Object[]> constructorFeeder()
   {
     return ImmutableList.of(
-        new Object[]{LockGranularity.TIME_CHUNK, false, false},
-        new Object[]{LockGranularity.TIME_CHUNK, true, true},
-        new Object[]{LockGranularity.SEGMENT, true, false}
+        new Object[]{LockGranularity.TIME_CHUNK, false, false, false},
+        new Object[]{LockGranularity.TIME_CHUNK, true, true, true},
+        new Object[]{LockGranularity.SEGMENT, true, false, true}
     );
   }
 
@@ -108,18 +108,21 @@ public class SinglePhaseParallelIndexingTest extends 
AbstractParallelIndexSuperv
 
   private final LockGranularity lockGranularity;
   private final boolean useInputFormatApi;
+  private final boolean useConcurrentLocks;
 
   private File inputDir;
 
   public SinglePhaseParallelIndexingTest(
       LockGranularity lockGranularity,
       boolean useInputFormatApi,
-      boolean useSegmentMetadataCache
+      boolean useSegmentMetadataCache,
+      boolean useConcurrentLocks
   )
   {
     super(DEFAULT_TRANSIENT_TASK_FAILURE_RATE, 
DEFAULT_TRANSIENT_API_FAILURE_RATE, useSegmentMetadataCache);
     this.lockGranularity = lockGranularity;
     this.useInputFormatApi = useInputFormatApi;
+    this.useConcurrentLocks = useConcurrentLocks;
   }
 
   @Before
@@ -1004,7 +1007,7 @@ public class SinglePhaseParallelIndexingTest extends 
AbstractParallelIndexSuperv
         null,
         null,
         ingestionSpec,
-        Collections.emptyMap()
+        Map.of(Tasks.USE_CONCURRENT_LOCKS, useConcurrentLocks)
     );
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to