This is an automated email from the ASF dual-hosted git repository.
amatya pushed a commit to branch 28.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/28.0.0 by this push:
new 0c4c1cf647f A Replacing task must read segments created before it
acquired its lock (#15085) (#15208)
0c4c1cf647f is described below
commit 0c4c1cf647fc438bf304335ed4df2bdcf41626bc
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Thu Oct 19 14:39:44 2023 +0530
A Replacing task must read segments created before it acquired its lock
(#15085) (#15208)
* Replacing tasks must read segments created before they acquired their
locks
---
.../MaterializedViewSupervisor.java | 3 +-
.../actions/RetrieveSegmentsToReplaceAction.java | 209 +++++++++++++++++++++
.../druid/indexing/common/actions/TaskAction.java | 1 +
.../task/batch/parallel/AbstractBatchSubtask.java | 16 +-
.../parallel/PartialDimensionCardinalityTask.java | 15 +-
.../parallel/PartialDimensionDistributionTask.java | 15 +-
.../parallel/PartialHashSegmentGenerateTask.java | 12 +-
.../parallel/PartialRangeSegmentGenerateTask.java | 12 +-
.../batch/parallel/PartialSegmentGenerateTask.java | 3 +-
.../batch/parallel/PartialSegmentMergeTask.java | 17 +-
.../batch/parallel/PerfectRollupWorkerTask.java | 5 +-
.../task/batch/parallel/SinglePhaseSubTask.java | 15 +-
.../druid/indexing/input/DruidInputSource.java | 12 +-
.../parallel/PerfectRollupWorkerTaskTest.java | 8 +-
.../concurrent/ConcurrentReplaceAndAppendTest.java | 66 +++++++
.../TestIndexerMetadataStorageCoordinator.java | 2 +-
.../IndexerMetadataStorageCoordinator.java | 6 +-
.../IndexerSQLMetadataStorageCoordinator.java | 25 ++-
.../druid/metadata/SqlSegmentsMetadataQuery.java | 115 ++++++++----
.../IndexerSQLMetadataStorageCoordinatorTest.java | 46 +++++
20 files changed, 478 insertions(+), 125 deletions(-)
diff --git
a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
index ac2738534da..d0a035be17c 100644
---
a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
+++
b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
@@ -40,6 +40,7 @@ import
org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
@@ -364,7 +365,7 @@ public class MaterializedViewSupervisor implements
Supervisor
// Pair<interval -> max(created_date), interval -> list<DataSegment>>
Pair<Map<Interval, String>, Map<Interval, List<DataSegment>>>
baseSegmentsSnapshot =
getMaxCreateDateAndBaseSegments(
-
metadataStorageCoordinator.retrieveUsedSegmentsAndCreatedDates(spec.getBaseDataSource())
+
metadataStorageCoordinator.retrieveUsedSegmentsAndCreatedDates(spec.getBaseDataSource(),
Intervals.ETERNITY)
);
// baseSegments are used to create HadoopIndexTask
Map<Interval, List<DataSegment>> baseSegments = baseSegmentsSnapshot.rhs;
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsToReplaceAction.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsToReplaceAction.java
new file mode 100644
index 00000000000..78e6ada5c1e
--- /dev/null
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsToReplaceAction.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.actions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.druid.indexing.common.task.Task;
+import
org.apache.druid.indexing.common.task.batch.parallel.AbstractBatchSubtask;
+import org.apache.druid.indexing.overlord.Segments;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.metadata.ReplaceTaskLock;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.Partitions;
+import org.apache.druid.timeline.SegmentTimeline;
+import org.joda.time.Interval;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This action exists in addition to retrieveUsedSegmentsAction because that
action suffers
+ * from a race condition described by the following sequence of events:
+ *
+ * -Segments S1, S2, S3 exist
+ * -Compact acquires a replace lock
+ * -A concurrent appending job publishes a segment S4 which needs to be
upgraded to the replace lock's version
+ * -Compact task processes S1-S4 to create new segments
+ * -Compact task publishes new segments and carries S4 forward to the new
version
+ *
+ * This can lead to the data in S4 being duplicated
+ *
+ * This TaskAction returns a collection of segments which have data within the
specified interval and are marked as
+ * used, and have been created before a REPLACE lock, if any, was acquired.
+ * This ensures that a consistent set of segments is returned each time this
action is called
+ */
+public class RetrieveSegmentsToReplaceAction implements
TaskAction<Collection<DataSegment>>
+{
+ private static final Logger log = new
Logger(RetrieveSegmentsToReplaceAction.class);
+
+ @JsonIgnore
+ private final String dataSource;
+
+ @JsonIgnore
+ private final Interval interval;
+
+ @JsonCreator
+ public RetrieveSegmentsToReplaceAction(
+ @JsonProperty("dataSource") String dataSource,
+ @JsonProperty("interval") Interval interval
+ )
+ {
+ this.dataSource = dataSource;
+ this.interval = interval;
+ }
+
+ @JsonProperty
+ public String getDataSource()
+ {
+ return dataSource;
+ }
+
+ @JsonProperty
+ public Interval getInterval()
+ {
+ return interval;
+ }
+
+ @Override
+ public TypeReference<Collection<DataSegment>> getReturnTypeReference()
+ {
+ return new TypeReference<Collection<DataSegment>>() {};
+ }
+
+ @Override
+ public Collection<DataSegment> perform(Task task, TaskActionToolbox toolbox)
+ {
+ // The DruidInputSource can be used to read from one datasource and write
to another.
+ // In such a case, the race condition described in the class-level docs
cannot occur,
+ // and the action can simply fetch all visible segments for the datasource
and interval
+ if (!task.getDataSource().equals(dataSource)) {
+ return retrieveAllVisibleSegments(toolbox);
+ }
+
+ final String supervisorId;
+ if (task instanceof AbstractBatchSubtask) {
+ supervisorId = ((AbstractBatchSubtask) task).getSupervisorTaskId();
+ } else {
+ supervisorId = task.getId();
+ }
+
+ final Set<ReplaceTaskLock> replaceLocksForTask = toolbox
+ .getTaskLockbox()
+ .getAllReplaceLocksForDatasource(task.getDataSource())
+ .stream()
+ .filter(lock -> supervisorId.equals(lock.getSupervisorTaskId()))
+ .collect(Collectors.toSet());
+
+ // If there are no replace locks for the task, simply fetch all visible
segments for the interval
+ if (replaceLocksForTask.isEmpty()) {
+ return retrieveAllVisibleSegments(toolbox);
+ }
+
+ Map<Interval, Map<String, Set<DataSegment>>> intervalToCreatedToSegments =
new HashMap<>();
+ for (Pair<DataSegment, String> segmentAndCreatedDate :
+
toolbox.getIndexerMetadataStorageCoordinator().retrieveUsedSegmentsAndCreatedDates(dataSource,
interval)) {
+ final DataSegment segment = segmentAndCreatedDate.lhs;
+ final String created = segmentAndCreatedDate.rhs;
+ intervalToCreatedToSegments.computeIfAbsent(segment.getInterval(), s ->
new HashMap<>())
+ .computeIfAbsent(created, c -> new
HashSet<>())
+ .add(segment);
+ }
+
+ Set<DataSegment> allSegmentsToBeReplaced = new HashSet<>();
+ for (final Map.Entry<Interval, Map<String, Set<DataSegment>>> entry :
intervalToCreatedToSegments.entrySet()) {
+ final Interval segmentInterval = entry.getKey();
+ String lockVersion = null;
+ for (ReplaceTaskLock replaceLock : replaceLocksForTask) {
+ if (replaceLock.getInterval().contains(segmentInterval)) {
+ lockVersion = replaceLock.getVersion();
+ }
+ }
+ final Map<String, Set<DataSegment>> createdToSegmentsMap =
entry.getValue();
+ for (Map.Entry<String, Set<DataSegment>> createdAndSegments :
createdToSegmentsMap.entrySet()) {
+ if (lockVersion == null ||
lockVersion.compareTo(createdAndSegments.getKey()) > 0) {
+ allSegmentsToBeReplaced.addAll(createdAndSegments.getValue());
+ } else {
+ for (DataSegment segment : createdAndSegments.getValue()) {
+ log.info("Ignoring segment[%s] as it has created_date[%s] greater
than the REPLACE lock version[%s]",
+ segment.getId(), createdAndSegments.getKey(),
lockVersion);
+ }
+ }
+ }
+ }
+
+ return SegmentTimeline.forSegments(allSegmentsToBeReplaced)
+
.findNonOvershadowedObjectsInInterval(Intervals.ETERNITY,
Partitions.ONLY_COMPLETE);
+ }
+
+ private Collection<DataSegment> retrieveAllVisibleSegments(TaskActionToolbox
toolbox)
+ {
+ return toolbox.getIndexerMetadataStorageCoordinator()
+ .retrieveUsedSegmentsForInterval(dataSource, interval,
Segments.ONLY_VISIBLE);
+ }
+
+ @Override
+ public boolean isAudited()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ RetrieveSegmentsToReplaceAction that = (RetrieveSegmentsToReplaceAction) o;
+
+ if (!dataSource.equals(that.dataSource)) {
+ return false;
+ }
+ return interval.equals(that.interval);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(dataSource, interval);
+ }
+
+ @Override
+ public String toString()
+ {
+ return getClass().getSimpleName() + "{" +
+ "dataSource='" + dataSource + '\'' +
+ ", interval=" + interval +
+ '}';
+ }
+}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java
index 171d53b9cdd..e251626f869 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java
@@ -38,6 +38,7 @@ import java.util.concurrent.Future;
@JsonSubTypes.Type(name = "segmentTransactionalInsert", value =
SegmentTransactionalInsertAction.class),
@JsonSubTypes.Type(name = "segmentTransactionalAppend", value =
SegmentTransactionalAppendAction.class),
@JsonSubTypes.Type(name = "segmentTransactionalReplace", value =
SegmentTransactionalReplaceAction.class),
+ @JsonSubTypes.Type(name = "retrieveSegmentsToReplace", value =
RetrieveSegmentsToReplaceAction.class),
// Type name doesn't correspond to the name of the class for backward
compatibility.
@JsonSubTypes.Type(name = "segmentListUsed", value =
RetrieveUsedSegmentsAction.class),
// Type name doesn't correspond to the name of the class for backward
compatibility.
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractBatchSubtask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractBatchSubtask.java
index 37b70c53ed5..e6de0cf2594 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractBatchSubtask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractBatchSubtask.java
@@ -19,6 +19,7 @@
package org.apache.druid.indexing.common.task.batch.parallel;
+import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
import org.apache.druid.indexing.common.task.TaskResource;
@@ -29,16 +30,20 @@ import java.util.Map;
public abstract class AbstractBatchSubtask extends AbstractBatchIndexTask
{
+ private final String supervisorTaskId;
+
protected AbstractBatchSubtask(
String id,
@Nullable String groupId,
@Nullable TaskResource taskResource,
String dataSource,
@Nullable Map<String, Object> context,
- @Nonnull IngestionMode ingestionMode
+ @Nonnull IngestionMode ingestionMode,
+ @Nonnull String supervisorTaskId
)
{
super(id, groupId, taskResource, dataSource, context, -1, ingestionMode);
+ this.supervisorTaskId = supervisorTaskId;
}
/**
@@ -46,4 +51,13 @@ public abstract class AbstractBatchSubtask extends
AbstractBatchIndexTask
* This ID is used to identify duplicate work of retry tasks for the same
spec.
*/
public abstract String getSubtaskSpecId();
+
+ /**
+ * @return Task ID of the {@code ParallelIndexSupervisorTask} which launched
this sub-task.
+ */
+ @JsonProperty
+ public String getSupervisorTaskId()
+ {
+ return supervisorTaskId;
+ }
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
index 625b2051783..698f8c9d0e9 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
@@ -69,7 +69,6 @@ public class PartialDimensionCardinalityTask extends
PerfectRollupWorkerTask
private final int numAttempts;
private final ParallelIndexIngestionSpec ingestionSchema;
- private final String supervisorTaskId;
private final String subtaskSpecId;
private final ObjectMapper jsonMapper;
@@ -95,7 +94,8 @@ public class PartialDimensionCardinalityTask extends
PerfectRollupWorkerTask
taskResource,
ingestionSchema.getDataSchema(),
ingestionSchema.getTuningConfig(),
- context
+ context,
+ supervisorTaskId
);
Preconditions.checkArgument(
@@ -107,7 +107,6 @@ public class PartialDimensionCardinalityTask extends
PerfectRollupWorkerTask
this.subtaskSpecId = subtaskSpecId;
this.numAttempts = numAttempts;
this.ingestionSchema = ingestionSchema;
- this.supervisorTaskId = supervisorTaskId;
this.jsonMapper = jsonMapper;
}
@@ -123,12 +122,6 @@ public class PartialDimensionCardinalityTask extends
PerfectRollupWorkerTask
return ingestionSchema;
}
- @JsonProperty
- private String getSupervisorTaskId()
- {
- return supervisorTaskId;
- }
-
@JsonProperty
@Override
public String getSubtaskSpecId()
@@ -163,7 +156,7 @@ public class PartialDimensionCardinalityTask extends
PerfectRollupWorkerTask
{
if
(!getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals().isEmpty())
{
return tryTimeChunkLock(
- new SurrogateTaskActionClient(supervisorTaskId, taskActionClient),
+ new SurrogateTaskActionClient(getSupervisorTaskId(),
taskActionClient),
getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
);
} else {
@@ -274,7 +267,7 @@ public class PartialDimensionCardinalityTask extends
PerfectRollupWorkerTask
{
final ParallelIndexSupervisorTaskClient taskClient =
toolbox.getSupervisorTaskClientProvider().build(
- supervisorTaskId,
+ getSupervisorTaskId(),
ingestionSchema.getTuningConfig().getChatHandlerTimeout(),
ingestionSchema.getTuningConfig().getChatHandlerNumRetries()
);
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
index b2ecd3dc269..8f03c3bfa55 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
@@ -82,7 +82,6 @@ public class PartialDimensionDistributionTask extends
PerfectRollupWorkerTask
private final int numAttempts;
private final ParallelIndexIngestionSpec ingestionSchema;
- private final String supervisorTaskId;
private final String subtaskSpecId;
// For testing
@@ -136,7 +135,8 @@ public class PartialDimensionDistributionTask extends
PerfectRollupWorkerTask
taskResource,
ingestionSchema.getDataSchema(),
ingestionSchema.getTuningConfig(),
- context
+ context,
+ supervisorTaskId
);
Preconditions.checkArgument(
@@ -148,7 +148,6 @@ public class PartialDimensionDistributionTask extends
PerfectRollupWorkerTask
this.subtaskSpecId = subtaskSpecId;
this.numAttempts = numAttempts;
this.ingestionSchema = ingestionSchema;
- this.supervisorTaskId = supervisorTaskId;
this.dedupInputRowFilterSupplier = dedupRowDimValueFilterSupplier;
}
@@ -164,12 +163,6 @@ public class PartialDimensionDistributionTask extends
PerfectRollupWorkerTask
return ingestionSchema;
}
- @JsonProperty
- private String getSupervisorTaskId()
- {
- return supervisorTaskId;
- }
-
@JsonProperty
@Override
public String getSubtaskSpecId()
@@ -204,7 +197,7 @@ public class PartialDimensionDistributionTask extends
PerfectRollupWorkerTask
{
if
(!getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals().isEmpty())
{
return tryTimeChunkLock(
- new SurrogateTaskActionClient(supervisorTaskId, taskActionClient),
+ new SurrogateTaskActionClient(getSupervisorTaskId(),
taskActionClient),
getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
);
} else {
@@ -326,7 +319,7 @@ public class PartialDimensionDistributionTask extends
PerfectRollupWorkerTask
private void sendReport(TaskToolbox toolbox, DimensionDistributionReport
report)
{
final ParallelIndexSupervisorTaskClient taskClient =
toolbox.getSupervisorTaskClientProvider().build(
- supervisorTaskId,
+ getSupervisorTaskId(),
ingestionSchema.getTuningConfig().getChatHandlerTimeout(),
ingestionSchema.getTuningConfig().getChatHandlerNumRetries()
);
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
index b91a6ce3a82..49e3591ff18 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
@@ -63,7 +63,6 @@ public class PartialHashSegmentGenerateTask extends
PartialSegmentGenerateTask<G
private final int numAttempts;
private final ParallelIndexIngestionSpec ingestionSchema;
- private final String supervisorTaskId;
private final String subtaskSpecId;
@Nullable
private final Map<Interval, Integer> intervalToNumShardsOverride;
@@ -96,7 +95,6 @@ public class PartialHashSegmentGenerateTask extends
PartialSegmentGenerateTask<G
this.subtaskSpecId = subtaskSpecId;
this.numAttempts = numAttempts;
this.ingestionSchema = ingestionSchema;
- this.supervisorTaskId = supervisorTaskId;
this.intervalToNumShardsOverride = intervalToNumShardsOverride;
}
@@ -112,12 +110,6 @@ public class PartialHashSegmentGenerateTask extends
PartialSegmentGenerateTask<G
return ingestionSchema;
}
- @JsonProperty
- public String getSupervisorTaskId()
- {
- return supervisorTaskId;
- }
-
@JsonProperty
@Override
public String getSubtaskSpecId()
@@ -158,7 +150,7 @@ public class PartialHashSegmentGenerateTask extends
PartialSegmentGenerateTask<G
public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
return tryTimeChunkLock(
- new SurrogateTaskActionClient(supervisorTaskId, taskActionClient),
+ new SurrogateTaskActionClient(getSupervisorTaskId(), taskActionClient),
getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
);
}
@@ -175,7 +167,7 @@ public class PartialHashSegmentGenerateTask extends
PartialSegmentGenerateTask<G
getDataSource(),
getSubtaskSpecId(),
granularitySpec,
- new SupervisorTaskAccess(supervisorTaskId, taskClient),
+ new SupervisorTaskAccess(getSupervisorTaskId(), taskClient),
createHashPartitionAnalysisFromPartitionsSpec(
granularitySpec,
partitionsSpec,
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java
index 147a1fbf121..27604eb7e77 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java
@@ -64,7 +64,6 @@ public class PartialRangeSegmentGenerateTask extends
PartialSegmentGenerateTask<
private static final String PROP_SPEC = "spec";
private static final boolean SKIP_NULL = true;
- private final String supervisorTaskId;
private final String subtaskSpecId;
private final int numAttempts;
private final ParallelIndexIngestionSpec ingestionSchema;
@@ -98,7 +97,6 @@ public class PartialRangeSegmentGenerateTask extends
PartialSegmentGenerateTask<
this.subtaskSpecId = subtaskSpecId;
this.numAttempts = numAttempts;
this.ingestionSchema = ingestionSchema;
- this.supervisorTaskId = supervisorTaskId;
this.intervalToPartitions = intervalToPartitions;
}
@@ -131,12 +129,6 @@ public class PartialRangeSegmentGenerateTask extends
PartialSegmentGenerateTask<
return ingestionSchema;
}
- @JsonProperty
- public String getSupervisorTaskId()
- {
- return supervisorTaskId;
- }
-
@JsonProperty
@Override
public String getSubtaskSpecId()
@@ -176,7 +168,7 @@ public class PartialRangeSegmentGenerateTask extends
PartialSegmentGenerateTask<
public boolean isReady(TaskActionClient taskActionClient) throws IOException
{
return tryTimeChunkLock(
- new SurrogateTaskActionClient(supervisorTaskId, taskActionClient),
+ new SurrogateTaskActionClient(getSupervisorTaskId(), taskActionClient),
getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
);
}
@@ -194,7 +186,7 @@ public class PartialRangeSegmentGenerateTask extends
PartialSegmentGenerateTask<
getDataSource(),
getSubtaskSpecId(),
ingestionSchema.getDataSchema().getGranularitySpec(),
- new SupervisorTaskAccess(supervisorTaskId, taskClient),
+ new SupervisorTaskAccess(getSupervisorTaskId(), taskClient),
partitionAnalysis
);
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
index dc2a7ef5bf9..e20c7bdbe35 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
@@ -94,7 +94,8 @@ abstract class PartialSegmentGenerateTask<T extends
GeneratedPartitionsReport> e
taskResource,
ingestionSchema.getDataSchema(),
ingestionSchema.getTuningConfig(),
- context
+ context,
+ supervisorTaskId
);
Preconditions.checkArgument(
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java
index bb933169d4b..b59ec65716c 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java
@@ -78,7 +78,6 @@ abstract class PartialSegmentMergeTask<S extends ShardSpec>
extends PerfectRollu
private final PartialSegmentMergeIOConfig ioConfig;
private final int numAttempts;
- private final String supervisorTaskId;
private final String subtaskSpecId;
PartialSegmentMergeTask(
@@ -101,7 +100,8 @@ abstract class PartialSegmentMergeTask<S extends ShardSpec>
extends PerfectRollu
taskResource,
dataSchema,
tuningConfig,
- context
+ context,
+ supervisorTaskId
);
Preconditions.checkArgument(
@@ -111,7 +111,6 @@ abstract class PartialSegmentMergeTask<S extends ShardSpec>
extends PerfectRollu
this.subtaskSpecId = subtaskSpecId;
this.ioConfig = ioConfig;
this.numAttempts = numAttempts;
- this.supervisorTaskId = supervisorTaskId;
}
@JsonProperty
@@ -120,12 +119,6 @@ abstract class PartialSegmentMergeTask<S extends
ShardSpec> extends PerfectRollu
return numAttempts;
}
- @JsonProperty
- public String getSupervisorTaskId()
- {
- return supervisorTaskId;
- }
-
@JsonProperty
@Override
public String getSubtaskSpecId()
@@ -151,7 +144,7 @@ abstract class PartialSegmentMergeTask<S extends ShardSpec>
extends PerfectRollu
}
final List<TaskLock> locks = toolbox.getTaskActionClient().submit(
- new SurrogateAction<>(supervisorTaskId, new LockListAction())
+ new SurrogateAction<>(getSupervisorTaskId(), new LockListAction())
);
final Map<Interval, String> intervalToVersion =
Maps.newHashMapWithExpectedSize(locks.size());
locks.forEach(lock -> {
@@ -179,7 +172,7 @@ abstract class PartialSegmentMergeTask<S extends ShardSpec>
extends PerfectRollu
LOG.info("Fetch took [%s] seconds", fetchTime);
final ParallelIndexSupervisorTaskClient taskClient =
toolbox.getSupervisorTaskClientProvider().build(
- supervisorTaskId,
+ getSupervisorTaskId(),
getTuningConfig().getChatHandlerTimeout(),
getTuningConfig().getChatHandlerNumRetries()
);
@@ -225,7 +218,7 @@ abstract class PartialSegmentMergeTask<S extends ShardSpec>
extends PerfectRollu
);
FileUtils.mkdirp(partitionDir);
for (PartitionLocation location : entryPerBucketId.getValue()) {
- final File unzippedDir =
toolbox.getShuffleClient().fetchSegmentFile(partitionDir, supervisorTaskId,
location);
+ final File unzippedDir =
toolbox.getShuffleClient().fetchSegmentFile(partitionDir,
getSupervisorTaskId(), location);
intervalToUnzippedFiles.computeIfAbsent(interval, k -> new
Int2ObjectOpenHashMap<>())
.computeIfAbsent(bucketId, k -> new ArrayList<>())
.add(unzippedDir);
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTask.java
index 4259922b43a..3b00f0fedf6 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTask.java
@@ -49,10 +49,11 @@ abstract class PerfectRollupWorkerTask extends
AbstractBatchSubtask
@Nullable TaskResource taskResource,
DataSchema dataSchema,
ParallelIndexTuningConfig tuningConfig,
- @Nullable Map<String, Object> context
+ @Nullable Map<String, Object> context,
+ String supervisorTaskId
)
{
- super(id, groupId, taskResource, dataSchema.getDataSource(), context,
IngestionMode.NONE);
+ super(id, groupId, taskResource, dataSchema.getDataSource(), context,
IngestionMode.NONE, supervisorTaskId);
Preconditions.checkArgument(
tuningConfig.isForceGuaranteedRollup(),
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
index 183cde7c66d..a3bd47d2960 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
@@ -118,7 +118,6 @@ public class SinglePhaseSubTask extends
AbstractBatchSubtask implements ChatHand
private final int numAttempts;
private final ParallelIndexIngestionSpec ingestionSchema;
- private final String supervisorTaskId;
private final String subtaskSpecId;
/**
@@ -169,7 +168,8 @@ public class SinglePhaseSubTask extends
AbstractBatchSubtask implements ChatHand
taskResource,
ingestionSchema.getDataSchema().getDataSource(),
context,
- AbstractTask.computeBatchIngestionMode(ingestionSchema.getIOConfig())
+ AbstractTask.computeBatchIngestionMode(ingestionSchema.getIOConfig()),
+ supervisorTaskId
);
if (ingestionSchema.getTuningConfig().isForceGuaranteedRollup()) {
@@ -179,7 +179,6 @@ public class SinglePhaseSubTask extends
AbstractBatchSubtask implements ChatHand
this.subtaskSpecId = subtaskSpecId;
this.numAttempts = numAttempts;
this.ingestionSchema = ingestionSchema;
- this.supervisorTaskId = supervisorTaskId;
this.missingIntervalsInOverwriteMode =
ingestionSchema.getIOConfig().isAppendToExisting() != true
&& ingestionSchema.getDataSchema()
.getGranularitySpec()
@@ -217,7 +216,7 @@ public class SinglePhaseSubTask extends
AbstractBatchSubtask implements ChatHand
public boolean isReady(TaskActionClient taskActionClient) throws IOException
{
return determineLockGranularityAndTryLock(
- new SurrogateTaskActionClient(supervisorTaskId, taskActionClient),
+ new SurrogateTaskActionClient(getSupervisorTaskId(), taskActionClient),
ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals()
);
}
@@ -234,12 +233,6 @@ public class SinglePhaseSubTask extends
AbstractBatchSubtask implements ChatHand
return ingestionSchema;
}
- @JsonProperty
- public String getSupervisorTaskId()
- {
- return supervisorTaskId;
- }
-
@Override
@JsonProperty
public String getSubtaskSpecId()
@@ -272,7 +265,7 @@ public class SinglePhaseSubTask extends
AbstractBatchSubtask implements ChatHand
final InputSource inputSource =
ingestionSchema.getIOConfig().getNonNullInputSource(toolbox);
final ParallelIndexSupervisorTaskClient taskClient =
toolbox.getSupervisorTaskClientProvider().build(
- supervisorTaskId,
+ getSupervisorTaskId(),
ingestionSchema.getTuningConfig().getChatHandlerTimeout(),
ingestionSchema.getTuningConfig().getChatHandlerNumRetries()
);
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java
b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java
index bf8b4bfb1d5..8056c69901f 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java
@@ -48,10 +48,9 @@ import
org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.TaskToolbox;
-import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
+import
org.apache.druid.indexing.common.actions.RetrieveSegmentsToReplaceAction;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.firehose.WindowedSegmentId;
-import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.guava.Comparators;
@@ -552,14 +551,7 @@ public class DruidInputSource extends AbstractInputSource
implements SplittableI
} else {
try {
usedSegments = toolbox.getTaskActionClient()
- .submit(
- new RetrieveUsedSegmentsAction(
- dataSource,
- null,
- Collections.singletonList(interval),
- Segments.ONLY_VISIBLE
- )
- );
+ .submit(new
RetrieveSegmentsToReplaceAction(dataSource, interval));
}
catch (IOException e) {
LOG.error(e, "Error retrieving the used segments for interval[%s].",
interval);
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTaskTest.java
index 71b474e7735..b95fd53c74f 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTaskTest.java
@@ -111,7 +111,8 @@ public class PerfectRollupWorkerTaskTest
null,
createDataSchema(granularitySpecInputIntervals),
createTuningConfig(forceGuaranteedRollup, partitionsSpec),
- null
+ null,
+ "supervisor-id"
);
}
@@ -149,10 +150,11 @@ public class PerfectRollupWorkerTaskTest
@Nullable TaskResource taskResource,
DataSchema dataSchema,
ParallelIndexTuningConfig tuningConfig,
- @Nullable Map<String, Object> context
+ @Nullable Map<String, Object> context,
+ String supervisorId
)
{
- super(id, groupId, taskResource, dataSchema, tuningConfig, context);
+ super(id, groupId, taskResource, dataSchema, tuningConfig, context,
supervisorId);
}
@Override
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
index 22f21fb79b6..1c4b6809c38 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
@@ -27,6 +27,7 @@ import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskStorageDirTracker;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TaskToolboxFactory;
+import
org.apache.druid.indexing.common.actions.RetrieveSegmentsToReplaceAction;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
@@ -845,6 +846,54 @@ public class ConcurrentReplaceAndAppendTest extends
IngestionTestBase
verifyIntervalHasVisibleSegments(YEAR_23, segmentV20, segmentV21,
segmentV22, segmentV23);
}
+ @Test
+ public void testSegmentsToReplace()
+ {
+ final SegmentIdWithShardSpec pendingSegmentV01
+ = appendTask.allocateSegmentForTimestamp(JAN_23.getStart(),
Granularities.MONTH);
+ Assert.assertEquals(SEGMENT_V0, pendingSegmentV01.getVersion());
+ Assert.assertEquals(JAN_23, pendingSegmentV01.getInterval());
+ final DataSegment segment1 = asSegment(pendingSegmentV01);
+ appendTask.commitAppendSegments(segment1);
+
+ final SegmentIdWithShardSpec pendingSegmentV02
+ = appendTask.allocateSegmentForTimestamp(JAN_23.getStart(),
Granularities.MONTH);
+ Assert.assertNotEquals(pendingSegmentV01.asSegmentId(),
pendingSegmentV02.asSegmentId());
+ Assert.assertEquals(SEGMENT_V0, pendingSegmentV02.getVersion());
+ Assert.assertEquals(JAN_23, pendingSegmentV02.getInterval());
+
+ verifyInputSegments(replaceTask, JAN_23, segment1);
+
+ replaceTask.acquireReplaceLockOn(JAN_23);
+
+ final DataSegment segment2 = asSegment(pendingSegmentV02);
+ appendTask.commitAppendSegments(segment2);
+
+ // Despite segment2 existing, it is not chosen to be replaced because it
was created after the tasklock was acquired
+ verifyInputSegments(replaceTask, JAN_23, segment1);
+
+ replaceTask.releaseLock(JAN_23);
+
+ final SegmentIdWithShardSpec pendingSegmentV03
+ = appendTask.allocateSegmentForTimestamp(JAN_23.getStart(),
Granularities.MONTH);
+ Assert.assertNotEquals(pendingSegmentV01.asSegmentId(),
pendingSegmentV03.asSegmentId());
+ Assert.assertNotEquals(pendingSegmentV02.asSegmentId(),
pendingSegmentV03.asSegmentId());
+ Assert.assertEquals(SEGMENT_V0, pendingSegmentV03.getVersion());
+ Assert.assertEquals(JAN_23, pendingSegmentV03.getInterval());
+ final DataSegment segment3 = asSegment(pendingSegmentV03);
+ appendTask.commitAppendSegments(segment3);
+ appendTask.releaseLock(JAN_23);
+
+ replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23);
+ // The new lock was acquired before segment3 was created but it doesn't
contain the month's interval
+ // So, all three segments are chosen
+ verifyInputSegments(replaceTask, JAN_23, segment1, segment2, segment3);
+
+ replaceTask.releaseLock(FIRST_OF_JAN_23);
+ // All the segments are chosen when there's no lock
+ verifyInputSegments(replaceTask, JAN_23, segment1, segment2, segment3);
+ }
+
@Nullable
private DataSegment findSegmentWith(String version, Map<String, Object>
loadSpec, Set<DataSegment> segments)
{
@@ -901,6 +950,23 @@ public class ConcurrentReplaceAndAppendTest extends
IngestionTestBase
}
}
+ private void verifyInputSegments(Task task, Interval interval,
DataSegment... expectedSegments)
+ {
+ try {
+ final TaskActionClient taskActionClient =
taskActionClientFactory.create(task);
+ Collection<DataSegment> allUsedSegments = taskActionClient.submit(
+ new RetrieveSegmentsToReplaceAction(
+ WIKI,
+ interval
+ )
+ );
+ Assert.assertEquals(Sets.newHashSet(expectedSegments),
Sets.newHashSet(allUsedSegments));
+ }
+ catch (IOException e) {
+ throw new ISE(e, "Error while fetching segments to replace in
interval[%s]", interval);
+ }
+ }
+
private TaskToolboxFactory createToolboxFactory(
TaskConfig taskConfig,
TaskActionClientFactory taskActionClientFactory
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
index d1c72485011..108833422c8 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
@@ -89,7 +89,7 @@ public class TestIndexerMetadataStorageCoordinator implements
IndexerMetadataSto
}
@Override
- public List<Pair<DataSegment, String>>
retrieveUsedSegmentsAndCreatedDates(String dataSource)
+ public List<Pair<DataSegment, String>>
retrieveUsedSegmentsAndCreatedDates(String dataSource, Interval interval)
{
return ImmutableList.of();
}
diff --git
a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
index 2c2a6bc0f77..7c6710048a1 100644
---
a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
+++
b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
@@ -82,8 +82,9 @@ public interface IndexerMetadataStorageCoordinator
Collection<DataSegment> retrieveAllUsedSegments(String dataSource, Segments
visibility);
/**
+ *
* Retrieve all published segments which are marked as used and the
created_date of these segments belonging to the
- * given data source from the metadata store.
+ * given data source and interval from the metadata store.
*
* Unlike other similar methods in this interface, this method doesn't
accept a {@link Segments} "visibility"
* parameter. The returned collection may include overshadowed segments and
their created_dates, as if {@link
@@ -91,10 +92,11 @@ public interface IndexerMetadataStorageCoordinator
* if needed.
*
* @param dataSource The data source to query
+ * @param interval The interval to query
*
* @return The DataSegments and the related created_date of segments
*/
- Collection<Pair<DataSegment, String>>
retrieveUsedSegmentsAndCreatedDates(String dataSource);
+ Collection<Pair<DataSegment, String>>
retrieveUsedSegmentsAndCreatedDates(String dataSource, Interval interval);
/**
* Retrieve all published segments which may include any data in the given
intervals and are marked as used from the
diff --git
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index 226663c3233..c654d5e229b 100644
---
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -174,15 +174,34 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
}
@Override
- public List<Pair<DataSegment, String>>
retrieveUsedSegmentsAndCreatedDates(String dataSource)
+ public List<Pair<DataSegment, String>>
retrieveUsedSegmentsAndCreatedDates(String dataSource, Interval interval)
{
- String rawQueryString = "SELECT created_date, payload FROM %1$s WHERE
dataSource = :dataSource AND used = true";
- final String queryString = StringUtils.format(rawQueryString,
dbTables.getSegmentsTable());
+ StringBuilder queryBuilder = new StringBuilder(
+ "SELECT created_date, payload FROM %1$s WHERE dataSource = :dataSource
AND used = true"
+ );
+
+ final List<Interval> intervals = new ArrayList<>();
+ // Do not need an interval condition if the interval is ETERNITY
+ if (!Intervals.isEternity(interval)) {
+ intervals.add(interval);
+ }
+
+ SqlSegmentsMetadataQuery.appendConditionForIntervalsAndMatchMode(
+ queryBuilder,
+ intervals,
+ SqlSegmentsMetadataQuery.IntervalMode.OVERLAPS,
+ connector
+ );
+
+ final String queryString = StringUtils.format(queryBuilder.toString(),
dbTables.getSegmentsTable());
return connector.retryWithHandle(
handle -> {
Query<Map<String, Object>> query = handle
.createQuery(queryString)
.bind("dataSource", dataSource);
+
+ SqlSegmentsMetadataQuery.bindQueryIntervals(query, intervals);
+
return query
.map((int index, ResultSet r, StatementContext ctx) ->
new Pair<>(
diff --git
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
index 20b176c5091..76e4f974576 100644
---
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
+++
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
@@ -261,6 +261,82 @@ public class SqlSegmentsMetadataQuery
return null;
}
+ /**
+ * Append the condition for the interval and match mode to the given string
builder with a partial query
+ * @param sb - StringBuilder containing the paritial query with SELECT
clause and WHERE condition for used, datasource
+ * @param intervals - intervals to fetch the segments for
+ * @param matchMode - Interval match mode - overlaps or contains
+ * @param connector - SQL connector
+ */
+ public static void appendConditionForIntervalsAndMatchMode(
+ final StringBuilder sb,
+ final Collection<Interval> intervals,
+ final IntervalMode matchMode,
+ final SQLMetadataConnector connector
+ )
+ {
+ if (intervals.isEmpty()) {
+ return;
+ }
+
+ sb.append(" AND (");
+ for (int i = 0; i < intervals.size(); i++) {
+ sb.append(
+ matchMode.makeSqlCondition(
+ connector.getQuoteString(),
+ StringUtils.format(":start%d", i),
+ StringUtils.format(":end%d", i)
+ )
+ );
+
+ // Add a special check for a segment which have one end at eternity and
the other at some finite value. Since
+ // we are using string comparison, a segment with this start or end will
not be returned otherwise.
+ if (matchMode.equals(IntervalMode.OVERLAPS)) {
+ sb.append(StringUtils.format(
+ " OR (start = '%s' AND \"end\" != '%s' AND \"end\" > :start%d)",
+ Intervals.ETERNITY.getStart(), Intervals.ETERNITY.getEnd(), i
+ ));
+ sb.append(StringUtils.format(
+ " OR (start != '%s' AND \"end\" = '%s' AND start < :end%d)",
+ Intervals.ETERNITY.getStart(), Intervals.ETERNITY.getEnd(), i
+ ));
+ }
+
+ if (i != intervals.size() - 1) {
+ sb.append(" OR ");
+ }
+ }
+
+ // Add a special check for a single segment with eternity. Since we are
using string comparison, a segment with
+ // this start and end will not be returned otherwise.
+ // Known Issue: https://github.com/apache/druid/issues/12860
+ if (matchMode.equals(IntervalMode.OVERLAPS)) {
+ sb.append(StringUtils.format(
+ " OR (start = '%s' AND \"end\" = '%s')",
Intervals.ETERNITY.getStart(), Intervals.ETERNITY.getEnd()
+ ));
+ }
+ sb.append(")");
+ }
+
+ /**
+ * Given a Query object bind the input intervals to it
+ * @param query Query to fetch segments
+ * @param intervals Intervals to fetch segments for
+ */
+ public static void bindQueryIntervals(final Query<Map<String, Object>>
query, final Collection<Interval> intervals)
+ {
+ if (intervals.isEmpty()) {
+ return;
+ }
+
+ final Iterator<Interval> iterator = intervals.iterator();
+ for (int i = 0; iterator.hasNext(); i++) {
+ Interval interval = iterator.next();
+ query.bind(StringUtils.format("start%d", i),
interval.getStart().toString())
+ .bind(StringUtils.format("end%d", i), interval.getEnd().toString());
+ }
+ }
+
private CloseableIterator<DataSegment> retrieveSegments(
final String dataSource,
final Collection<Interval> intervals,
@@ -275,36 +351,8 @@ public class SqlSegmentsMetadataQuery
final StringBuilder sb = new StringBuilder();
sb.append("SELECT payload FROM %s WHERE used = :used AND dataSource =
:dataSource");
- if (compareAsString && !intervals.isEmpty()) {
- sb.append(" AND (");
- for (int i = 0; i < intervals.size(); i++) {
- sb.append(
- matchMode.makeSqlCondition(
- connector.getQuoteString(),
- StringUtils.format(":start%d", i),
- StringUtils.format(":end%d", i)
- )
- );
-
- // Add a special check for a segment which have one end at eternity
and the other at some finite value. Since
- // we are using string comparison, a segment with this start or end
will not be returned otherwise.
- if (matchMode.equals(IntervalMode.OVERLAPS)) {
- sb.append(StringUtils.format(" OR (start = '%s' AND \"end\" != '%s'
AND \"end\" > :start%d)", Intervals.ETERNITY.getStart(),
Intervals.ETERNITY.getEnd(), i));
- sb.append(StringUtils.format(" OR (start != '%s' AND \"end\" = '%s'
AND start < :end%d)", Intervals.ETERNITY.getStart(),
Intervals.ETERNITY.getEnd(), i));
- }
-
- if (i != intervals.size() - 1) {
- sb.append(" OR ");
- }
- }
-
- // Add a special check for a single segment with eternity. Since we are
using string comparison, a segment with
- // this start and end will not be returned otherwise.
- // Known Issue: https://github.com/apache/druid/issues/12860
- if (matchMode.equals(IntervalMode.OVERLAPS)) {
- sb.append(StringUtils.format(" OR (start = '%s' AND \"end\" = '%s')",
Intervals.ETERNITY.getStart(), Intervals.ETERNITY.getEnd()));
- }
- sb.append(")");
+ if (compareAsString) {
+ appendConditionForIntervalsAndMatchMode(sb, intervals, matchMode,
connector);
}
final Query<Map<String, Object>> sql = handle
@@ -317,12 +365,7 @@ public class SqlSegmentsMetadataQuery
}
if (compareAsString) {
- final Iterator<Interval> iterator = intervals.iterator();
- for (int i = 0; iterator.hasNext(); i++) {
- Interval interval = iterator.next();
- sql.bind(StringUtils.format("start%d", i),
interval.getStart().toString())
- .bind(StringUtils.format("end%d", i), interval.getEnd().toString());
- }
+ bindQueryIntervals(sql, intervals);
}
final ResultIterator<DataSegment> resultIterator =
diff --git
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index b1b6f3aa16e..0512357ffc1 100644
---
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -2554,6 +2554,52 @@ public class IndexerSQLMetadataStorageCoordinatorTest
);
}
+ @Test
+ public void testRetrieveUsedSegmentsAndCreatedDates()
+ {
+ insertUsedSegments(ImmutableSet.of(defaultSegment));
+
+ List<Pair<DataSegment, String>> resultForIntervalOnTheLeft =
+
coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(),
Intervals.of("2000/2001"));
+ Assert.assertTrue(resultForIntervalOnTheLeft.isEmpty());
+
+ List<Pair<DataSegment, String>> resultForIntervalOnTheRight =
+
coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(),
Intervals.of("3000/3001"));
+ Assert.assertTrue(resultForIntervalOnTheRight.isEmpty());
+
+ List<Pair<DataSegment, String>> resultForExactInterval =
+
coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(),
defaultSegment.getInterval());
+ Assert.assertEquals(1, resultForExactInterval.size());
+ Assert.assertEquals(defaultSegment, resultForExactInterval.get(0).lhs);
+
+ List<Pair<DataSegment, String>> resultForIntervalWithLeftOverlap =
+
coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(),
Intervals.of("2000/2015-01-02"));
+ Assert.assertEquals(resultForExactInterval,
resultForIntervalWithLeftOverlap);
+
+ List<Pair<DataSegment, String>> resultForIntervalWithRightOverlap =
+
coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(),
Intervals.of("2015-01-01/3000"));
+ Assert.assertEquals(resultForExactInterval,
resultForIntervalWithRightOverlap);
+
+ List<Pair<DataSegment, String>> resultForEternity =
+
coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(),
Intervals.ETERNITY);
+ Assert.assertEquals(resultForExactInterval, resultForEternity);
+ }
+
+ @Test
+ public void
testRetrieveUsedSegmentsAndCreatedDatesFetchesEternityForAnyInterval()
+ {
+
+ insertUsedSegments(ImmutableSet.of(eternitySegment,
firstHalfEternityRangeSegment, secondHalfEternityRangeSegment));
+
+ List<Pair<DataSegment, String>> resultForRandomInterval =
+
coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(),
defaultSegment.getInterval());
+ Assert.assertEquals(3, resultForRandomInterval.size());
+
+ List<Pair<DataSegment, String>> resultForEternity =
+
coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(),
eternitySegment.getInterval());
+ Assert.assertEquals(3, resultForEternity.size());
+ }
+
private static class DS
{
static final String WIKI = "wiki";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]