This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 24654b1ccc0 Fix concurrent replace single phase tasks (#18099)
24654b1ccc0 is described below
commit 24654b1ccc0cda31a7470595871e7766a0f8fbeb
Author: Misha <[email protected]>
AuthorDate: Tue Jun 10 13:53:37 2025 +0200
Fix concurrent replace single phase tasks (#18099)
Changes:
- Use REPLACE lock in single phase tasks when appendToExisting is false
- Update tests to verify the change
---
.../common/task/AbstractBatchIndexTask.java | 2 +-
.../parallel/ParallelIndexSupervisorTask.java | 14 ++++---
.../SinglePhaseParallelIndexTaskRunner.java | 46 ++++++----------------
.../ParallelIndexSupervisorTaskKillTest.java | 7 +---
.../ParallelIndexSupervisorTaskResourceTest.java | 7 +---
.../parallel/SinglePhaseParallelIndexingTest.java | 15 ++++---
6 files changed, 33 insertions(+), 58 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
index 95ca4febaf2..97dc70f3357 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
@@ -535,7 +535,7 @@ public abstract class AbstractBatchIndexTask extends
AbstractTask
/**
* Determines the type of lock to use with the given lock granularity.
*/
- private TaskLockType determineLockType(LockGranularity lockGranularity)
+ public TaskLockType determineLockType(LockGranularity lockGranularity)
{
if (lockGranularity == LockGranularity.SEGMENT) {
return TaskLockType.EXCLUSIVE;
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index 1d134f81ddb..9faa4c2013e 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -310,6 +310,13 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask
return ingestionSchema;
}
+ @Nullable
+ @JsonIgnore
+ public String getBaseSubtaskSpecName()
+ {
+ return baseSubtaskSpecName;
+ }
+
@VisibleForTesting
@Nullable
ParallelIndexTaskRunner getCurrentRunner()
@@ -351,12 +358,7 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask
{
return new SinglePhaseParallelIndexTaskRunner(
toolbox,
- getId(),
- getGroupId(),
- baseSubtaskSpecName,
- ingestionSchema,
- getContext(),
- toolbox.getCentralizedTableSchemaConfig()
+ this
);
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java
index a2a29b3cdd3..4ecda626cab 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java
@@ -28,13 +28,11 @@ import org.apache.druid.indexing.common.Counters;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskToolbox;
-import org.apache.druid.indexing.common.actions.TaskLocks;
import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
import org.apache.druid.indexing.common.task.Task;
import
org.apache.druid.indexing.common.task.batch.parallel.TaskMonitor.SubTaskCompleteEvent;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.NonnullPair;
-import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.BuildingNumberedShardSpec;
@@ -101,42 +99,24 @@ public class SinglePhaseParallelIndexTaskRunner extends
ParallelIndexPhaseRunner
private final ParallelIndexIngestionSpec ingestionSchema;
private final SplittableInputSource<?> baseInputSource;
- private CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig;
+ private final ParallelIndexSupervisorTask supervisorTask;
SinglePhaseParallelIndexTaskRunner(
TaskToolbox toolbox,
- String taskId,
- String groupId,
- String baseSubtaskSpecName,
- ParallelIndexIngestionSpec ingestionSchema,
- Map<String, Object> context,
- CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
+ ParallelIndexSupervisorTask supervisorTask
)
{
super(
toolbox,
- taskId,
- groupId,
- baseSubtaskSpecName,
- ingestionSchema.getTuningConfig(),
- context
+ supervisorTask.getId(),
+ supervisorTask.getGroupId(),
+ supervisorTask.getBaseSubtaskSpecName(),
+ supervisorTask.getIngestionSchema().getTuningConfig(),
+ supervisorTask.getContext()
);
- this.ingestionSchema = ingestionSchema;
+ this.ingestionSchema = supervisorTask.getIngestionSchema();
this.baseInputSource = (SplittableInputSource)
ingestionSchema.getIOConfig().getNonNullInputSource(toolbox);
- this.centralizedDatasourceSchemaConfig = centralizedDatasourceSchemaConfig;
- }
-
- @VisibleForTesting
- SinglePhaseParallelIndexTaskRunner(
- TaskToolbox toolbox,
- String taskId,
- String groupId,
- ParallelIndexIngestionSpec ingestionSchema,
- Map<String, Object> context,
- CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
- )
- {
- this(toolbox, taskId, groupId, taskId, ingestionSchema, context,
centralizedDatasourceSchemaConfig);
+ this.supervisorTask = supervisorTask;
}
@Override
@@ -207,7 +187,7 @@ public class SinglePhaseParallelIndexTaskRunner extends
ParallelIndexPhaseRunner
@Deprecated
public SegmentIdWithShardSpec allocateNewSegment(String dataSource, DateTime
timestamp) throws IOException
{
- NonnullPair<Interval, String> intervalAndVersion =
findIntervalAndVersion(timestamp);
+ NonnullPair<Interval, String> intervalAndVersion =
findIntervalAndVersion(timestamp,
supervisorTask.getTaskLockHelper().getLockGranularityToUse());
final int partitionNum =
Counters.getAndIncrementInt(partitionNumCountersPerInterval,
intervalAndVersion.lhs);
return new SegmentIdWithShardSpec(
@@ -237,7 +217,7 @@ public class SinglePhaseParallelIndexTaskRunner extends
ParallelIndexPhaseRunner
@Nullable String prevSegmentId
) throws IOException
{
- NonnullPair<Interval, String> intervalAndVersion =
findIntervalAndVersion(timestamp);
+ NonnullPair<Interval, String> intervalAndVersion =
findIntervalAndVersion(timestamp, LockGranularity.TIME_CHUNK);
MutableObject<SegmentIdWithShardSpec> segmentIdHolder = new
MutableObject<>();
sequenceToSegmentIds.compute(sequenceName, (k, v) -> {
@@ -285,9 +265,9 @@ public class SinglePhaseParallelIndexTaskRunner extends
ParallelIndexPhaseRunner
return segmentIdHolder.getValue();
}
- NonnullPair<Interval, String> findIntervalAndVersion(DateTime timestamp)
throws IOException
+ NonnullPair<Interval, String> findIntervalAndVersion(DateTime timestamp,
LockGranularity granularity) throws IOException
{
- TaskLockType taskLockType =
TaskLocks.determineLockTypeForAppend(getContext());
+ TaskLockType taskLockType = supervisorTask.determineLockType(granularity);
return AbstractBatchIndexTask.findIntervalAndVersion(getToolbox(),
ingestionSchema, timestamp, taskLockType);
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java
index 8f5d5450dcf..6a482195abd 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java
@@ -39,7 +39,6 @@ import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.segment.indexing.DataSchema;
-import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
@@ -256,11 +255,7 @@ public class ParallelIndexSupervisorTaskKillTest extends
AbstractParallelIndexSu
{
super(
toolbox,
- supervisorTask.getId(),
- supervisorTask.getGroupId(),
- supervisorTask.getIngestionSchema(),
- supervisorTask.getContext(),
- CentralizedDatasourceSchemaConfig.create()
+ supervisorTask
);
this.supervisorTask = supervisorTask;
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
index e8fa4ad1307..b3e47776407 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
@@ -48,7 +48,6 @@ import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.segment.indexing.DataSchema;
-import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.realtime.appenderator.SegmentAllocator;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.server.security.AuthConfig;
@@ -495,11 +494,7 @@ public class ParallelIndexSupervisorTaskResourceTest
extends AbstractParallelInd
{
super(
toolbox,
- supervisorTask.getId(),
- supervisorTask.getGroupId(),
- supervisorTask.getIngestionSchema(),
- supervisorTask.getContext(),
- CentralizedDatasourceSchemaConfig.enabled(true)
+ supervisorTask
);
this.supervisorTask = supervisorTask;
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
index fe573bf0f63..cc3cdffa747 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
@@ -93,13 +93,13 @@ public class SinglePhaseParallelIndexingTest extends
AbstractParallelIndexSuperv
@Rule
public ExpectedException expectedException = ExpectedException.none();
- @Parameterized.Parameters(name = "{0}, useInputFormatApi={1},
useSegmentCache={2}")
+ @Parameterized.Parameters(name = "{0}, useInputFormatApi={1},
useSegmentCache={2}, useConcurrentLocks={3}")
public static Iterable<Object[]> constructorFeeder()
{
return ImmutableList.of(
- new Object[]{LockGranularity.TIME_CHUNK, false, false},
- new Object[]{LockGranularity.TIME_CHUNK, true, true},
- new Object[]{LockGranularity.SEGMENT, true, false}
+ new Object[]{LockGranularity.TIME_CHUNK, false, false, false},
+ new Object[]{LockGranularity.TIME_CHUNK, true, true, true},
+ new Object[]{LockGranularity.SEGMENT, true, false, true}
);
}
@@ -108,18 +108,21 @@ public class SinglePhaseParallelIndexingTest extends
AbstractParallelIndexSuperv
private final LockGranularity lockGranularity;
private final boolean useInputFormatApi;
+ private final boolean useConcurrentLocks;
private File inputDir;
public SinglePhaseParallelIndexingTest(
LockGranularity lockGranularity,
boolean useInputFormatApi,
- boolean useSegmentMetadataCache
+ boolean useSegmentMetadataCache,
+ boolean useConcurrentLocks
)
{
super(DEFAULT_TRANSIENT_TASK_FAILURE_RATE,
DEFAULT_TRANSIENT_API_FAILURE_RATE, useSegmentMetadataCache);
this.lockGranularity = lockGranularity;
this.useInputFormatApi = useInputFormatApi;
+ this.useConcurrentLocks = useConcurrentLocks;
}
@Before
@@ -1004,7 +1007,7 @@ public class SinglePhaseParallelIndexingTest extends
AbstractParallelIndexSuperv
null,
null,
ingestionSpec,
- Collections.emptyMap()
+ Map.of(Tasks.USE_CONCURRENT_LOCKS, useConcurrentLocks)
);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]