This is an automated email from the ASF dual-hosted git repository.
amatya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new d6c760f7ce7 Do not kill segments with referenced load specs from deep
storage (#16667)
d6c760f7ce7 is described below
commit d6c760f7ce74b5864cd7d0750736f1609d171fdf
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Mon Jul 15 14:07:53 2024 +0530
Do not kill segments with referenced load specs from deep storage (#16667)
Do not kill segments with referenced load specs from deep storage
---
.../ActionBasedPublishedSegmentRetriever.java | 2 +-
.../RetrieveUpgradedFromSegmentIdsAction.java | 90 ++++++++
.../RetrieveUpgradedToSegmentIdsAction.java | 95 +++++++++
.../druid/indexing/common/actions/TaskAction.java | 2 +
.../actions/UpgradedFromSegmentsResponse.java} | 26 ++-
.../actions/UpgradedToSegmentsResponse.java} | 28 ++-
.../common/task/KillUnusedSegmentsTask.java | 150 ++++++++++++--
.../indexing/common/task/IngestionTestBase.java | 13 +-
.../common/task/KillUnusedSegmentsTaskTest.java | 227 ++++++++++++++++++++-
.../druid/indexing/test/TestDataSegmentKiller.java | 13 +-
.../TestIndexerMetadataStorageCoordinator.java | 18 ++
.../IndexerMetadataStorageCoordinator.java | 17 ++
.../IndexerSQLMetadataStorageCoordinator.java | 118 ++++++++++-
.../druid/metadata/PendingSegmentRecord.java | 5 +-
.../druid/metadata/SQLMetadataConnector.java | 10 +
.../druid/metadata/SqlSegmentsMetadataQuery.java | 13 +-
.../apache/druid/server/http/DataSegmentPlus.java | 23 ++-
.../IndexerSQLMetadataStorageCoordinatorTest.java | 197 +++++++++++++++++-
...dexerSqlMetadataStorageCoordinatorTestBase.java | 49 +++++
.../druid/server/http/DataSegmentPlusTest.java | 3 +-
.../druid/server/http/MetadataResourceTest.java | 2 +-
21 files changed, 1024 insertions(+), 77 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedPublishedSegmentRetriever.java
b/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedPublishedSegmentRetriever.java
index ba5cf923b12..bb349cc9790 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedPublishedSegmentRetriever.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedPublishedSegmentRetriever.java
@@ -79,7 +79,7 @@ public class ActionBasedPublishedSegmentRetriever implements
PublishedSegmentRet
catch (Exception e) {
log.warn(
e,
- "Could not retrieve published segment IDs[%s] using task
action[segmentListById]."
+ "Could not retrieve published segment IDs[%s] using task
action[retrieveSegmentsById]."
+ " Overlord maybe on an older version, retrying with
action[segmentListUsed]."
+ " This task may fail to publish segments if there is a concurrent
replace happening.",
serializedSegmentIds
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUpgradedFromSegmentIdsAction.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUpgradedFromSegmentIdsAction.java
new file mode 100644
index 00000000000..67f7ae6e131
--- /dev/null
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUpgradedFromSegmentIdsAction.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.actions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.druid.indexing.common.task.Task;
+
+import java.util.Set;
+
+/**
+ * Task action to retrieve the segment IDs from which a given set of segments
were upgraded.
+ */
+public class RetrieveUpgradedFromSegmentIdsAction implements
TaskAction<UpgradedFromSegmentsResponse>
+{
+ private final String dataSource;
+ private final Set<String> segmentIds;
+
+ @JsonCreator
+ public RetrieveUpgradedFromSegmentIdsAction(
+ @JsonProperty("dataSource") String dataSource,
+ @JsonProperty("segmentIds") Set<String> segmentIds
+ )
+ {
+ this.dataSource = dataSource;
+ this.segmentIds = segmentIds;
+ }
+
+ @JsonProperty
+ public String getDataSource()
+ {
+ return dataSource;
+ }
+
+ @JsonProperty
+ public Set<String> getSegmentIds()
+ {
+ return segmentIds;
+ }
+
+ @Override
+ public TypeReference<UpgradedFromSegmentsResponse> getReturnTypeReference()
+ {
+ return new TypeReference<UpgradedFromSegmentsResponse>()
+ {
+ };
+ }
+
+ @Override
+ public UpgradedFromSegmentsResponse perform(Task task, TaskActionToolbox
toolbox)
+ {
+ return new UpgradedFromSegmentsResponse(
+ toolbox.getIndexerMetadataStorageCoordinator()
+ .retrieveUpgradedFromSegmentIds(dataSource, segmentIds)
+ );
+ }
+
+ @Override
+ public boolean isAudited()
+ {
+ return false;
+ }
+
+ @Override
+ public String toString()
+ {
+ return getClass().getSimpleName() + "{" +
+ "dataSource='" + dataSource + '\'' +
+ ", segmentIds=" + segmentIds +
+ '}';
+ }
+}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUpgradedToSegmentIdsAction.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUpgradedToSegmentIdsAction.java
new file mode 100644
index 00000000000..412c9604d11
--- /dev/null
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUpgradedToSegmentIdsAction.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.actions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.druid.indexing.common.task.Task;
+
+import java.util.Set;
+
+/**
+ * Task action to determine the set of all segments containing the same load
spec given the parent id. <br/>
+ * Returns a map from a segment ID to a set containing:
+ * <ol>
+ * <li> all segment IDs that were upgraded from it AND are still present in
the metadata store </li>
+ * <li> the segment ID itself if and only if it is still present in the
metadata store </li>
+ * </ol>
+ */
+public class RetrieveUpgradedToSegmentIdsAction implements
TaskAction<UpgradedToSegmentsResponse>
+{
+ private final String dataSource;
+ private final Set<String> segmentIds;
+
+ @JsonCreator
+ public RetrieveUpgradedToSegmentIdsAction(
+ @JsonProperty("dataSource") String dataSource,
+ @JsonProperty("segmentIds") Set<String> segmentIds
+ )
+ {
+ this.dataSource = dataSource;
+ this.segmentIds = segmentIds;
+ }
+
+ @JsonProperty
+ public String getDataSource()
+ {
+ return dataSource;
+ }
+
+ @JsonProperty
+ public Set<String> getSegmentIds()
+ {
+ return segmentIds;
+ }
+
+ @Override
+ public TypeReference<UpgradedToSegmentsResponse> getReturnTypeReference()
+ {
+ return new TypeReference<UpgradedToSegmentsResponse>()
+ {
+ };
+ }
+
+ @Override
+ public UpgradedToSegmentsResponse perform(Task task, TaskActionToolbox
toolbox)
+ {
+ return new UpgradedToSegmentsResponse(
+ toolbox.getIndexerMetadataStorageCoordinator()
+ .retrieveUpgradedToSegmentIds(dataSource, segmentIds)
+ );
+ }
+
+ @Override
+ public boolean isAudited()
+ {
+ return false;
+ }
+
+ @Override
+ public String toString()
+ {
+ return getClass().getSimpleName() + "{" +
+ "dataSource='" + dataSource + '\'' +
+ ", segmentIds=" + segmentIds +
+ '}';
+ }
+}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java
index 4606bd597a8..973a83ecee4 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java
@@ -39,6 +39,8 @@ import java.util.concurrent.Future;
@JsonSubTypes.Type(name = "segmentTransactionalAppend", value =
SegmentTransactionalAppendAction.class),
@JsonSubTypes.Type(name = "segmentTransactionalReplace", value =
SegmentTransactionalReplaceAction.class),
@JsonSubTypes.Type(name = "retrieveSegmentsById", value =
RetrieveSegmentsByIdAction.class),
+ @JsonSubTypes.Type(name = "retrieveUpgradedFromSegmentIds", value =
RetrieveUpgradedFromSegmentIdsAction.class),
+ @JsonSubTypes.Type(name = "retrieveUpgradedToSegmentIds", value =
RetrieveUpgradedToSegmentIdsAction.class),
@JsonSubTypes.Type(name = "segmentListUsed", value =
RetrieveUsedSegmentsAction.class),
@JsonSubTypes.Type(name = "segmentListUnused", value =
RetrieveUnusedSegmentsAction.class),
@JsonSubTypes.Type(name = "markSegmentsAsUnused", value =
MarkSegmentsAsUnusedAction.class),
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestDataSegmentKiller.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpgradedFromSegmentsResponse.java
similarity index 58%
copy from
indexing-service/src/test/java/org/apache/druid/indexing/test/TestDataSegmentKiller.java
copy to
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpgradedFromSegmentsResponse.java
index 33421eb1a5c..5f0f1775f16 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestDataSegmentKiller.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpgradedFromSegmentsResponse.java
@@ -17,22 +17,28 @@
* under the License.
*/
-package org.apache.druid.indexing.test;
+package org.apache.druid.indexing.common.actions;
-import org.apache.druid.segment.loading.DataSegmentKiller;
-import org.apache.druid.timeline.DataSegment;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
-public class TestDataSegmentKiller implements DataSegmentKiller
+import java.util.Map;
+
+public class UpgradedFromSegmentsResponse
{
- @Override
- public void kill(DataSegment segment)
+ private final Map<String, String> upgradedFromSegmentIds;
+
+ @JsonCreator
+ public UpgradedFromSegmentsResponse(
+ @JsonProperty("upgradedFromSegmentIds") Map<String, String>
upgradedFromSegmentIds
+ )
{
- // do nothing
+ this.upgradedFromSegmentIds = upgradedFromSegmentIds;
}
- @Override
- public void killAll()
+ @JsonProperty
+ public Map<String, String> getUpgradedFromSegmentIds()
{
- throw new UnsupportedOperationException("not implemented");
+ return upgradedFromSegmentIds;
}
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestDataSegmentKiller.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpgradedToSegmentsResponse.java
similarity index 57%
copy from
indexing-service/src/test/java/org/apache/druid/indexing/test/TestDataSegmentKiller.java
copy to
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpgradedToSegmentsResponse.java
index 33421eb1a5c..e9bf33a97ce 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestDataSegmentKiller.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpgradedToSegmentsResponse.java
@@ -17,22 +17,30 @@
* under the License.
*/
-package org.apache.druid.indexing.test;
+package org.apache.druid.indexing.common.actions;
-import org.apache.druid.segment.loading.DataSegmentKiller;
-import org.apache.druid.timeline.DataSegment;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
-public class TestDataSegmentKiller implements DataSegmentKiller
+import java.util.Map;
+import java.util.Set;
+
+public class UpgradedToSegmentsResponse
{
- @Override
- public void kill(DataSegment segment)
+
+ private final Map<String, Set<String>> upgradedToSegmentIds;
+
+ @JsonCreator
+ public UpgradedToSegmentsResponse(
+ @JsonProperty("upgradedToSegmentIds") Map<String, Set<String>>
upgradedToSegmentIds
+ )
{
- // do nothing
+ this.upgradedToSegmentIds = upgradedToSegmentIds;
}
- @Override
- public void killAll()
+ @JsonProperty
+ public Map<String, Set<String>> getUpgradedToSegmentIds()
{
- throw new UnsupportedOperationException("not implemented");
+ return upgradedToSegmentIds;
}
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
index fe49569a3bb..e1f6d2915ee 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
@@ -35,11 +35,14 @@ import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.RetrieveUnusedSegmentsAction;
+import
org.apache.druid.indexing.common.actions.RetrieveUpgradedFromSegmentIdsAction;
+import
org.apache.druid.indexing.common.actions.RetrieveUpgradedToSegmentIdsAction;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.SegmentNukeAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TaskLocks;
import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction;
+import org.apache.druid.indexing.common.actions.UpgradedToSegmentsResponse;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
@@ -47,6 +50,8 @@ import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.utils.CollectionUtils;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@@ -54,6 +59,7 @@ import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -63,9 +69,23 @@ import java.util.TreeMap;
import java.util.stream.Collectors;
/**
+ * <p/>
* The client representation of this task is {@link
ClientKillUnusedSegmentsTaskQuery}.
* JSON serialization fields of this class must correspond to those of {@link
* ClientKillUnusedSegmentsTaskQuery}, except for {@link #id} and {@link
#context} fields.
+ * <p/>
+ * The Kill task fetches the set of used segments for the interval and
computes the set of their load specs. <br/>
+ * Until `limit` segments have been processed in total or all segments for the
interval have been nuked:
+ * <ol>
+ * <li> Fetch at most `batchSize` unused segments from the metadata store.
</li>
+ * <li> Determine the mapping from these segments to their parents *before*
nuking the segments. </li>
+ * <li> Nuke the batch of unused segments from the metadata store. </li>
+ * <li> Determine the mapping of the set of parents to all their children.
</li>
+ * <li> Check if unused or parent segments exist. </li>
+ * <li> Find the unreferenced segments. </li>
+ * <li> Filter the set of unreferenced segments using load specs from the set
of used segments. </li>
+ * <li> Kill the filtered set of segments from deep storage. </li>
+ * </ol>
*/
public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
{
@@ -76,7 +96,7 @@ public class KillUnusedSegmentsTask extends
AbstractFixedIntervalTask
* Default nuke batch size. This is a small enough size that we still get
value from batching, while
* yielding as quickly as possible. In one real cluster environment backed
with mysql, ~2000rows/sec,
* with batch size of 100, means a batch should only less than a second for
the task lock, and depending
- * on the segment store latency, unoptimised S3 cleanups typically take 5-10
seconds per 100. Over time
+ * on the segment store latency, unoptimised S3 cleanups typically take 5-10
seconds per 100. Over time,
* we expect the S3 cleanup to get quicker, so this should be < 1 second,
which means we'll be yielding
* the task lockbox every 1-2 seconds.
*/
@@ -97,13 +117,15 @@ public class KillUnusedSegmentsTask extends
AbstractFixedIntervalTask
/**
* Maximum number of segments that can be killed.
*/
- @Nullable private final Integer limit;
+ @Nullable
+ private final Integer limit;
/**
* The maximum used status last updated time. Any segments with
* {@code used_status_last_updated} no later than this time will be included
in the kill task.
*/
- @Nullable private final DateTime maxUsedStatusLastUpdatedTime;
+ @Nullable
+ private final DateTime maxUsedStatusLastUpdatedTime;
@JsonCreator
public KillUnusedSegmentsTask(
@@ -196,18 +218,17 @@ public class KillUnusedSegmentsTask extends
AbstractFixedIntervalTask
numTotalBatches != null ? StringUtils.format(" in [%d] batches.",
numTotalBatches) : "."
);
+ final TaskActionClient taskActionClient = toolbox.getTaskActionClient();
RetrieveUsedSegmentsAction retrieveUsedSegmentsAction = new
RetrieveUsedSegmentsAction(
getDataSource(),
ImmutableList.of(getInterval()),
Segments.INCLUDING_OVERSHADOWED
);
// Fetch the load specs of all segments overlapping with the unused
segment intervals
- final Set<Map<String, Object>> usedSegmentLoadSpecs =
- new
HashSet<>(toolbox.getTaskActionClient().submit(retrieveUsedSegmentsAction)
- .stream()
- .map(DataSegment::getLoadSpec)
- .collect(Collectors.toSet())
- );
+ final Set<Map<String, Object>> usedSegmentLoadSpecs =
taskActionClient.submit(retrieveUsedSegmentsAction)
+
.stream()
+
.map(DataSegment::getLoadSpec)
+
.collect(Collectors.toSet());
do {
if (nextBatchSize <= 0) {
@@ -231,20 +252,47 @@ public class KillUnusedSegmentsTask extends
AbstractFixedIntervalTask
);
}
- // Kill segments
- // Order is important here: we want the nuke action to clean up the
metadata records _before_ the
- // segments are removed from storage, this helps maintain that we will
always have a storage segment if
- // the metadata segment is present. If the segment nuke throws an
exception, then the segment cleanup is
- // abandoned.
+ // Kill segments. Order is important here:
+ // Retrieve the segment upgrade infos for the batch _before_ the
segments are nuked
+ // We then want the nuke action to clean up the metadata records
_before_ the segments are removed from storage.
+ // This helps maintain that we will always have a storage segment if the
metadata segment is present.
+ // Determine the subset of segments to be killed from deep storage based
on loadspecs.
+ // If the segment nuke throws an exception, then the segment cleanup is
abandoned.
+
+ // Determine upgraded segment ids before nuking
+ final Set<String> segmentIds = unusedSegments.stream()
+ .map(DataSegment::getId)
+ .map(SegmentId::toString)
+
.collect(Collectors.toSet());
+ final Map<String, String> upgradedFromSegmentIds = new HashMap<>();
+ try {
+ upgradedFromSegmentIds.putAll(
+ taskActionClient.submit(
+ new RetrieveUpgradedFromSegmentIdsAction(getDataSource(),
segmentIds)
+ ).getUpgradedFromSegmentIds()
+ );
+ }
+ catch (Exception e) {
+ LOG.warn(
+ e,
+ "Could not retrieve parent segment ids using task
action[retrieveUpgradedFromSegmentIds]."
+ + " Overlord may be on an older version."
+ );
+ }
- toolbox.getTaskActionClient().submit(new SegmentNukeAction(new
HashSet<>(unusedSegments)));
+ // Nuke Segments
+ taskActionClient.submit(new SegmentNukeAction(new
HashSet<>(unusedSegments)));
- // Kill segments from the deep storage only if their load specs are not
being used by any used segments
- final List<DataSegment> segmentsToBeKilled = unusedSegments
- .stream()
- .filter(unusedSegment -> unusedSegment.getLoadSpec() == null
- ||
!usedSegmentLoadSpecs.contains(unusedSegment.getLoadSpec()))
- .collect(Collectors.toList());
+ // Determine segments to be killed
+ final List<DataSegment> segmentsToBeKilled
+ = getKillableSegments(unusedSegments, upgradedFromSegmentIds,
usedSegmentLoadSpecs, taskActionClient);
+
+ final Set<DataSegment> segmentsNotKilled = new HashSet<>(unusedSegments);
+ segmentsToBeKilled.forEach(segmentsNotKilled::remove);
+ LOG.infoSegments(
+ segmentsNotKilled,
+ "Skipping segment kill from deep storage as their load specs are
referenced by other segments."
+ );
toolbox.getDataSegmentKiller().kill(segmentsToBeKilled);
numBatchesProcessed++;
@@ -253,7 +301,7 @@ public class KillUnusedSegmentsTask extends
AbstractFixedIntervalTask
LOG.info("Processed [%d] batches for kill task[%s].",
numBatchesProcessed, getId());
nextBatchSize = computeNextBatchSize(numSegmentsKilled);
- } while (unusedSegments.size() != 0 && (null == numTotalBatches ||
numBatchesProcessed < numTotalBatches));
+ } while (!unusedSegments.isEmpty() && (null == numTotalBatches ||
numBatchesProcessed < numTotalBatches));
final String taskId = getId();
LOG.info(
@@ -300,6 +348,64 @@ public class KillUnusedSegmentsTask extends
AbstractFixedIntervalTask
return taskLockMap;
}
+ /**
+ * Determines subset of segments without referenced load specs that can be
safely killed by
+ * looking at the segment upgrades and used segment load specs
+ * @param unusedSegments input segments
+ * @param upgradedFromSegmentIds segment to parent mapping
+ * @param usedSegmentLoadSpecs load specs of used segments
+ * @param taskActionClient task action client
+ * @return list of segments to kill from deep storage
+ */
+ private List<DataSegment> getKillableSegments(
+ List<DataSegment> unusedSegments,
+ Map<String, String> upgradedFromSegmentIds,
+ Set<Map<String, Object>> usedSegmentLoadSpecs,
+ TaskActionClient taskActionClient
+ )
+ {
+
+ // Determine parentId for each unused segment
+ final Map<String, Set<DataSegment>> parentIdToUnusedSegments = new
HashMap<>();
+ for (DataSegment segment : unusedSegments) {
+ final String segmentId = segment.getId().toString();
+ parentIdToUnusedSegments.computeIfAbsent(
+ upgradedFromSegmentIds.getOrDefault(segmentId, segmentId),
+ k -> new HashSet<>()
+ ).add(segment);
+ }
+
+ // Check if the parent or any of its children exist in metadata store
+ try {
+ UpgradedToSegmentsResponse response = taskActionClient.submit(
+ new RetrieveUpgradedToSegmentIdsAction(getDataSource(),
parentIdToUnusedSegments.keySet())
+ );
+ if (response != null && response.getUpgradedToSegmentIds() != null) {
+ response.getUpgradedToSegmentIds().forEach((parent, children) -> {
+ if (!CollectionUtils.isNullOrEmpty(children)) {
+ // Do not kill segment if its parent or any of its siblings still
exist in metadata store
+ parentIdToUnusedSegments.remove(parent);
+ }
+ });
+ }
+ }
+ catch (Exception e) {
+ LOG.warn(
+ e,
+ "Could not retrieve referenced ids using task
action[retrieveUpgradedToSegmentIds]."
+ + " Overlord may be on an older version."
+ );
+ }
+
+ // Filter using the used segment load specs as segment upgrades predate
the above task action
+ return parentIdToUnusedSegments.values()
+ .stream()
+ .flatMap(Set::stream)
+ .filter(segment ->
!usedSegmentLoadSpecs.contains(segment.getLoadSpec()))
+ .collect(Collectors.toList());
+ }
+
+
@Override
public LookupLoadingSpec getLookupLoadingSpec()
{
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
index 133ced3907d..d6687efbf34 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
@@ -61,6 +61,7 @@ import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
+import org.apache.druid.indexing.test.TestDataSegmentKiller;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.RE;
@@ -81,7 +82,6 @@ import
org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.LocalDataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
-import org.apache.druid.segment.loading.NoopDataSegmentKiller;
import org.apache.druid.segment.loading.SegmentCacheManager;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.metadata.SegmentSchemaCache;
@@ -130,6 +130,7 @@ public abstract class IngestionTestBase extends
InitializedNullHandlingTest
private SegmentSchemaManager segmentSchemaManager;
private SegmentSchemaCache segmentSchemaCache;
private SupervisorManager supervisorManager;
+ private TestDataSegmentKiller dataSegmentKiller;
protected File reportsFile;
@Before
@@ -169,6 +170,7 @@ public abstract class IngestionTestBase extends
InitializedNullHandlingTest
lockbox = new TaskLockbox(taskStorage, storageCoordinator);
segmentCacheManagerFactory = new
SegmentCacheManagerFactory(TestIndex.INDEX_IO, getObjectMapper());
reportsFile = temporaryFolder.newFile();
+ dataSegmentKiller = new TestDataSegmentKiller();
}
@After
@@ -243,6 +245,11 @@ public abstract class IngestionTestBase extends
InitializedNullHandlingTest
return testUtils.getRowIngestionMetersFactory();
}
+ public TestDataSegmentKiller getDataSegmentKiller()
+ {
+ return dataSegmentKiller;
+ }
+
public TaskActionToolbox createTaskActionToolbox()
{
storageCoordinator.start();
@@ -265,7 +272,7 @@ public abstract class IngestionTestBase extends
InitializedNullHandlingTest
.taskExecutorNode(new DruidNode("druid/middlemanager", "localhost",
false, 8091, null, true, false))
.taskActionClient(createActionClient(task))
.segmentPusher(new LocalDataSegmentPusher(new
LocalDataSegmentPusherConfig()))
- .dataSegmentKiller(new NoopDataSegmentKiller())
+ .dataSegmentKiller(dataSegmentKiller)
.joinableFactory(NoopJoinableFactory.INSTANCE)
.jsonMapper(objectMapper)
.taskWorkDir(baseDir)
@@ -450,7 +457,7 @@ public abstract class IngestionTestBase extends
InitializedNullHandlingTest
.taskExecutorNode(new DruidNode("druid/middlemanager",
"localhost", false, 8091, null, true, false))
.taskActionClient(taskActionClient)
.segmentPusher(new LocalDataSegmentPusher(new
LocalDataSegmentPusherConfig()))
- .dataSegmentKiller(new NoopDataSegmentKiller())
+ .dataSegmentKiller(dataSegmentKiller)
.joinableFactory(NoopJoinableFactory.INSTANCE)
.jsonMapper(objectMapper)
.taskWorkDir(baseDir)
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
index f888ace5a54..fe2b5a51c86 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
@@ -36,6 +36,7 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.JodaUtils;
+import org.apache.druid.metadata.IndexerSqlMetadataStorageCoordinatorTestBase;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.timeline.DataSegment;
import org.assertj.core.api.Assertions;
@@ -72,10 +73,10 @@ public class KillUnusedSegmentsTaskTest extends
IngestionTestBase
taskRunner = new TestTaskRunner();
final String version = DateTimes.nowUtc().toString();
- segment1 = newSegment(Intervals.of("2019-01-01/2019-02-01"), version);
- segment2 = newSegment(Intervals.of("2019-02-01/2019-03-01"), version);
- segment3 = newSegment(Intervals.of("2019-03-01/2019-04-01"), version);
- segment4 = newSegment(Intervals.of("2019-04-01/2019-05-01"), version);
+ segment1 = newSegment(Intervals.of("2019-01-01/2019-02-01"),
version).withLoadSpec(ImmutableMap.of("k", 1));
+ segment2 = newSegment(Intervals.of("2019-02-01/2019-03-01"),
version).withLoadSpec(ImmutableMap.of("k", 2));
+ segment3 = newSegment(Intervals.of("2019-03-01/2019-04-01"),
version).withLoadSpec(ImmutableMap.of("k", 3));
+ segment4 = newSegment(Intervals.of("2019-04-01/2019-05-01"),
version).withLoadSpec(ImmutableMap.of("k", 4));
}
@Test
@@ -125,6 +126,212 @@ public class KillUnusedSegmentsTaskTest extends
IngestionTestBase
new KillTaskReport.Stats(1, 2),
getReportedStats()
);
+ Assert.assertEquals(ImmutableSet.of(segment3),
getDataSegmentKiller().getKilledSegments());
+ }
+
+ @Test
+ public void testKillSegmentsDeleteUnreferencedSiblings() throws Exception
+ {
+ final Map<String, String> upgradeSegmentMapping = ImmutableMap.of(
+ segment1.getId().toString(),
+ "nonExistentParent",
+ segment2.getId().toString(),
+ "nonExistentParent"
+ );
+ insertUsedSegments(ImmutableSet.of(segment1, segment2),
upgradeSegmentMapping);
+ getStorageCoordinator().markSegmentsAsUnusedWithinInterval(DATA_SOURCE,
Intervals.ETERNITY);
+
+
+ final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder()
+ .dataSource(DATA_SOURCE)
+ .interval(Intervals.ETERNITY)
+ .build();
+
+ Assert.assertEquals(TaskState.SUCCESS,
taskRunner.run(task).get().getStatusCode());
+
+ final List<DataSegment> observedUnusedSegments =
+ getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(
+ DATA_SOURCE,
+ Intervals.ETERNITY,
+ null,
+ null,
+ null
+ );
+
+ Assert.assertEquals(Collections.emptyList(), observedUnusedSegments);
+
+ Assert.assertEquals(
+ new KillTaskReport.Stats(2, 2),
+ getReportedStats()
+ );
+ Assert.assertEquals(ImmutableSet.of(segment1, segment2),
getDataSegmentKiller().getKilledSegments());
+ }
+
+ @Test
+ public void testKillSegmentsDoNotDeleteReferencedSibling() throws Exception
+ {
+ final Map<String, String> upgradeSegmentMapping = ImmutableMap.of(
+ segment1.getId().toString(),
+ "nonExistentParent",
+ segment2.getId().toString(),
+ "nonExistentParent"
+ );
+ insertUsedSegments(ImmutableSet.of(segment1, segment2),
upgradeSegmentMapping);
+ getStorageCoordinator().markSegmentsAsUnusedWithinInterval(DATA_SOURCE,
Intervals.ETERNITY);
+
+
+ final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder()
+ .dataSource(DATA_SOURCE)
+ .interval(segment1.getInterval())
+ .build();
+
+ Assert.assertEquals(TaskState.SUCCESS,
taskRunner.run(task).get().getStatusCode());
+
+ final List<DataSegment> observedUnusedSegments =
+ getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(
+ DATA_SOURCE,
+ Intervals.ETERNITY,
+ null,
+ null,
+ null
+ );
+
+ Assert.assertEquals(Collections.singletonList(segment2),
observedUnusedSegments);
+
+ Assert.assertEquals(
+ new KillTaskReport.Stats(0, 2),
+ getReportedStats()
+ );
+ Assert.assertEquals(Collections.emptySet(),
getDataSegmentKiller().getKilledSegments());
+ }
+
+ @Test
+ public void testKillSegmentsDoNotDeleteParentWithReferencedChildren() throws
Exception
+ {
+ final Map<String, String> upgradeSegmentMapping = ImmutableMap.of(
+ segment1.getId().toString(),
+ segment3.getId().toString(),
+ segment2.getId().toString(),
+ segment3.getId().toString()
+ );
+ insertUsedSegments(ImmutableSet.of(segment1, segment2, segment3),
upgradeSegmentMapping);
+ getSegmentsMetadataManager().markSegmentAsUnused(segment2.getId());
+ getSegmentsMetadataManager().markSegmentAsUnused(segment3.getId());
+
+
+ final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder()
+ .dataSource(DATA_SOURCE)
+ .interval(Intervals.ETERNITY)
+ .build();
+
+ Assert.assertEquals(TaskState.SUCCESS,
taskRunner.run(task).get().getStatusCode());
+
+ final List<DataSegment> observedUnusedSegments =
+ getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(
+ DATA_SOURCE,
+ Intervals.ETERNITY,
+ null,
+ null,
+ null
+ );
+ Assert.assertEquals(ImmutableList.of(), observedUnusedSegments);
+ Assertions.assertThat(
+ getMetadataStorageCoordinator().retrieveUsedSegmentsForInterval(
+ DATA_SOURCE,
+ Intervals.ETERNITY,
+ Segments.ONLY_VISIBLE
+ )
+ ).containsExactlyInAnyOrder(segment1);
+
+ Assert.assertEquals(
+ new KillTaskReport.Stats(0, 2),
+ getReportedStats()
+ );
+ Assert.assertEquals(Collections.emptySet(),
getDataSegmentKiller().getKilledSegments());
+ }
+
+ @Test
+ public void testKillSegmentsDoNotDeleteChildrenWithReferencedParent() throws
Exception
+ {
+ final Map<String, String> upgradeSegmentMapping = ImmutableMap.of(
+ segment1.getId().toString(),
+ segment3.getId().toString(),
+ segment2.getId().toString(),
+ segment3.getId().toString()
+ );
+ insertUsedSegments(ImmutableSet.of(segment1, segment2, segment3),
upgradeSegmentMapping);
+ getSegmentsMetadataManager().markSegmentAsUnused(segment1.getId());
+ getSegmentsMetadataManager().markSegmentAsUnused(segment2.getId());
+
+
+ final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder()
+ .dataSource(DATA_SOURCE)
+ .interval(Intervals.ETERNITY)
+ .build();
+
+ Assert.assertEquals(TaskState.SUCCESS,
taskRunner.run(task).get().getStatusCode());
+
+ final List<DataSegment> observedUnusedSegments =
+ getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(
+ DATA_SOURCE,
+ Intervals.ETERNITY,
+ null,
+ null,
+ null
+ );
+ Assert.assertEquals(ImmutableList.of(), observedUnusedSegments);
+ Assertions.assertThat(
+ getMetadataStorageCoordinator().retrieveUsedSegmentsForInterval(
+ DATA_SOURCE,
+ Intervals.ETERNITY,
+ Segments.ONLY_VISIBLE
+ )
+ ).containsExactlyInAnyOrder(segment3);
+
+ Assert.assertEquals(
+ new KillTaskReport.Stats(0, 2),
+ getReportedStats()
+ );
+ Assert.assertEquals(Collections.emptySet(),
getDataSegmentKiller().getKilledSegments());
+ }
+
+ @Test
+ public void testKillSegmentsDeleteChildrenAndParent() throws Exception
+ {
+ final Map<String, String> upgradeSegmentMapping = ImmutableMap.of(
+ segment1.getId().toString(),
+ segment3.getId().toString(),
+ segment2.getId().toString(),
+ segment3.getId().toString()
+ );
+ insertUsedSegments(ImmutableSet.of(segment1, segment2, segment3),
upgradeSegmentMapping);
+ getSegmentsMetadataManager().markSegmentAsUnused(segment1.getId());
+ getSegmentsMetadataManager().markSegmentAsUnused(segment2.getId());
+ getSegmentsMetadataManager().markSegmentAsUnused(segment3.getId());
+
+
+ final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder()
+ .dataSource(DATA_SOURCE)
+ .interval(Intervals.ETERNITY)
+ .build();
+
+ Assert.assertEquals(TaskState.SUCCESS,
taskRunner.run(task).get().getStatusCode());
+
+ final List<DataSegment> observedUnusedSegments =
+ getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(
+ DATA_SOURCE,
+ Intervals.ETERNITY,
+ null,
+ null,
+ null
+ );
+ Assert.assertEquals(ImmutableList.of(), observedUnusedSegments);
+
+ Assert.assertEquals(
+ new KillTaskReport.Stats(3, 2),
+ getReportedStats()
+ );
+ Assert.assertEquals(ImmutableSet.of(segment1, segment2, segment3),
getDataSegmentKiller().getKilledSegments());
}
@Test
@@ -1247,4 +1454,16 @@ public class KillUnusedSegmentsTaskTest extends
IngestionTestBase
10L
);
}
+
+ private void insertUsedSegments(Set<DataSegment> segments, Map<String,
String> upgradedFromSegmentIdMap)
+ {
+ final String table =
derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable();
+ IndexerSqlMetadataStorageCoordinatorTestBase.insertUsedSegments(
+ segments,
+ upgradedFromSegmentIdMap,
+ derbyConnectorRule.getConnector(),
+ table,
+ getObjectMapper()
+ );
+ }
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestDataSegmentKiller.java
b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestDataSegmentKiller.java
index 33421eb1a5c..92581f6dd1e 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestDataSegmentKiller.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestDataSegmentKiller.java
@@ -22,12 +22,18 @@ package org.apache.druid.indexing.test;
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.timeline.DataSegment;
+import java.util.HashSet;
+import java.util.Set;
+
public class TestDataSegmentKiller implements DataSegmentKiller
{
+
+ private final Set<DataSegment> killedSegments = new HashSet<>();
+
@Override
public void kill(DataSegment segment)
{
- // do nothing
+ killedSegments.add(segment);
}
@Override
@@ -35,4 +41,9 @@ public class TestDataSegmentKiller implements
DataSegmentKiller
{
throw new UnsupportedOperationException("not implemented");
}
+
+ public Set<DataSegment> getKilledSegments()
+ {
+ return killedSegments;
+ }
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
index 61a57e94842..d2055d6e0c9 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
@@ -314,6 +314,24 @@ public class TestIndexerMetadataStorageCoordinator
implements IndexerMetadataSto
throw new UnsupportedOperationException();
}
+ @Override
+ public Map<String, String> retrieveUpgradedFromSegmentIds(
+ final String dataSource,
+ final Set<String> segmentIds
+ )
+ {
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public Map<String, Set<String>> retrieveUpgradedToSegmentIds(
+ final String dataSource,
+ final Set<String> segmentIds
+ )
+ {
+ return Collections.emptyMap();
+ }
+
public Set<DataSegment> getPublished()
{
return ImmutableSet.copyOf(published);
diff --git
a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
index c055a8d9e9f..83b4ac7e474 100644
---
a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
+++
b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
@@ -473,4 +473,21 @@ public interface IndexerMetadataStorageCoordinator
* @return List of pending segment records
*/
List<PendingSegmentRecord> getPendingSegments(String datasource, Interval
interval);
+
+ /**
+ * Map from a segment ID to the segment ID from which it was upgraded
+ * There should be no entry in the map for an original non-upgraded segment
+ * @param dataSource data source
+ * @param segmentIds ids of segments
+ */
+ Map<String, String> retrieveUpgradedFromSegmentIds(String dataSource,
Set<String> segmentIds);
+
+ /**
+ * Map from a segment ID to a set containing
+ * 1) all segment IDs that were upgraded from it AND are still present in
the metadata store
+ * 2) the segment ID itself if and only if it is still present in the
metadata store
+ * @param dataSource data source
+ * @param segmentIds ids of the first segments which had the corresponding
load spec
+ */
+ Map<String, Set<String>> retrieveUpgradedToSegmentIds(String dataSource,
Set<String> segmentIds);
}
diff --git
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index fd637728908..54f75ccb920 100644
---
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -564,6 +564,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
createNewIdsOfAppendSegmentsAfterReplace(handle,
replaceSegments, locksHeldByReplaceTask);
Map<SegmentId, SegmentMetadata> upgradeSegmentMetadata = new
HashMap<>();
+ final Map<String, String> upgradedFromSegmentIdMap = new
HashMap<>();
for (DataSegmentPlus dataSegmentPlus : upgradedSegments) {
segmentsToInsert.add(dataSegmentPlus.getDataSegment());
if (dataSegmentPlus.getSchemaFingerprint() != null &&
dataSegmentPlus.getNumRows() != null) {
@@ -572,6 +573,12 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
new SegmentMetadata(dataSegmentPlus.getNumRows(),
dataSegmentPlus.getSchemaFingerprint())
);
}
+ if (dataSegmentPlus.getUpgradedFromSegmentId() != null) {
+ upgradedFromSegmentIdMap.put(
+ dataSegmentPlus.getDataSegment().getId().toString(),
+ dataSegmentPlus.getUpgradedFromSegmentId()
+ );
+ }
}
SegmentPublishResult result = SegmentPublishResult.ok(
insertSegments(
@@ -579,7 +586,8 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
segmentsToInsert,
segmentSchemaMapping,
upgradeSegmentMetadata,
- Collections.emptyMap()
+ Collections.emptyMap(),
+ upgradedFromSegmentIdMap
),
upgradePendingSegmentsOverlappingWith(segmentsToInsert)
);
@@ -1408,6 +1416,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
final Set<DataSegment> allSegmentsToInsert = new HashSet<>(appendSegments);
final Map<SegmentId, SegmentId> newVersionSegmentToParent = new
HashMap<>();
final Map<String, DataSegment> segmentIdMap = new HashMap<>();
+ final Map<String, String> upgradedFromSegmentIdMap = new HashMap<>();
appendSegments.forEach(segment ->
segmentIdMap.put(segment.getId().toString(), segment));
segmentIdsForNewVersions.forEach(
pendingSegment -> {
@@ -1415,6 +1424,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
final DataSegment oldSegment =
segmentIdMap.get(pendingSegment.getUpgradedFromSegmentId());
final SegmentId newVersionSegmentId =
pendingSegment.getId().asSegmentId();
newVersionSegmentToParent.put(newVersionSegmentId,
oldSegment.getId());
+ upgradedFromSegmentIdMap.put(newVersionSegmentId.toString(),
oldSegment.getId().toString());
allSegmentsToInsert.add(
new DataSegment(
pendingSegment.getId().asSegmentId(),
@@ -1473,7 +1483,8 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
allSegmentsToInsert,
segmentSchemaMapping,
Collections.emptyMap(),
- newVersionSegmentToParent
+ newVersionSegmentToParent,
+ upgradedFromSegmentIdMap
)
);
},
@@ -2092,7 +2103,8 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
.bind("version", segment.getVersion())
.bind("used", usedSegments.contains(segment))
.bind("payload", jsonMapper.writeValueAsBytes(segment))
- .bind("used_status_last_updated", now);
+ .bind("used_status_last_updated", now)
+ .bind("upgraded_from_segment_id", (String) null);
if (schemaPersistEnabled) {
Long numRows = null;
@@ -2217,6 +2229,11 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
.shardSpec(shardSpec)
.build();
+ // When the segment already has an upgraded_from_segment_id, reuse it
for its children
+ final String upgradedFromSegmentId =
oldSegmentMetadata.getUpgradedFromSegmentId() == null
+ ?
oldSegmentMetadata.getDataSegment().getId().toString()
+ :
oldSegmentMetadata.getUpgradedFromSegmentId();
+
upgradedSegments.add(
new DataSegmentPlus(
dataSegment,
@@ -2224,7 +2241,9 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
null,
null,
oldSegmentMetadata.getSchemaFingerprint(),
- oldSegmentMetadata.getNumRows())
+ oldSegmentMetadata.getNumRows(),
+ upgradedFromSegmentId
+ )
);
}
@@ -2266,7 +2285,8 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
Set<DataSegment> segments,
@Nullable SegmentSchemaMapping segmentSchemaMapping,
Map<SegmentId, SegmentMetadata> upgradeSegmentMetadata,
- Map<SegmentId, SegmentId> newVersionForAppendToParent
+ Map<SegmentId, SegmentId> newVersionForAppendToParent,
+ Map<String, String> upgradedFromSegmentIdMap
) throws IOException
{
boolean shouldPersistSchema = shouldPersistSchema(segmentSchemaMapping);
@@ -2302,7 +2322,8 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
.bind("version", segment.getVersion())
.bind("used", true)
.bind("payload", jsonMapper.writeValueAsBytes(segment))
- .bind("used_status_last_updated", now);
+ .bind("used_status_last_updated", now)
+ .bind("upgraded_from_segment_id",
upgradedFromSegmentIdMap.get(segment.getId().toString()));
if (schemaPersistEnabled) {
SegmentMetadata segmentMetadata =
@@ -2449,9 +2470,9 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
{
String insertStatement =
"INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s,"
- + " partitioned, version, used, payload, used_status_last_updated
%3$s) "
+ + " partitioned, version, used, payload, used_status_last_updated,
upgraded_from_segment_id %3$s) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end,"
- + " :partitioned, :version, :used, :payload, :used_status_last_updated
%4$s)";
+ + " :partitioned, :version, :used, :payload,
:used_status_last_updated, :upgraded_from_segment_id %4$s)";
if (schemaPersistEnabled) {
return StringUtils.format(
@@ -2923,6 +2944,87 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
);
}
+ @Override
+ public Map<String, String> retrieveUpgradedFromSegmentIds(
+ final String dataSource,
+ final Set<String> segmentIds
+ )
+ {
+ if (segmentIds.isEmpty()) {
+ return Collections.emptyMap();
+ }
+
+ final List<String> segmentIdList = ImmutableList.copyOf(segmentIds);
+ final String sql = StringUtils.format(
+ "SELECT id, upgraded_from_segment_id FROM %s WHERE dataSource =
:dataSource %s",
+ dbTables.getSegmentsTable(),
+ SqlSegmentsMetadataQuery.getParameterizedInConditionForColumn("id",
segmentIdList)
+ );
+ final Map<String, String> upgradedFromSegmentIds = new HashMap<>();
+ connector.retryWithHandle(
+ handle -> {
+ Query<Map<String, Object>> query = handle.createQuery(sql)
+ .bind("dataSource",
dataSource);
+
SqlSegmentsMetadataQuery.bindColumnValuesToQueryWithInCondition("id",
segmentIdList, query);
+ return query.map((index, r, ctx) -> {
+ final String id = r.getString(1);
+ final String upgradedFromSegmentId = r.getString(2);
+ if (upgradedFromSegmentId != null) {
+ upgradedFromSegmentIds.put(id, upgradedFromSegmentId);
+ }
+ return null;
+ }).list();
+ }
+ );
+ return upgradedFromSegmentIds;
+ }
+
+ @Override
+ public Map<String, Set<String>> retrieveUpgradedToSegmentIds(
+ final String dataSource,
+ final Set<String> segmentIds
+ )
+ {
+ if (segmentIds.isEmpty()) {
+ return Collections.emptyMap();
+ }
+
+ final List<String> upgradedFromSegmentIdList =
ImmutableList.copyOf(segmentIds);
+ final String sql = StringUtils.format(
+ "SELECT id, upgraded_from_segment_id FROM %s WHERE dataSource =
:dataSource %s",
+ dbTables.getSegmentsTable(),
+ SqlSegmentsMetadataQuery.getParameterizedInConditionForColumn(
+ "upgraded_from_segment_id",
+ upgradedFromSegmentIdList
+ )
+ );
+ final Map<String, Set<String>> upgradedToSegmentIds = new HashMap<>();
+ retrieveSegmentsById(dataSource, segmentIds)
+ .stream()
+ .map(DataSegment::getId)
+ .map(SegmentId::toString)
+ .forEach(id -> upgradedToSegmentIds.computeIfAbsent(id, k -> new
HashSet<>()).add(id));
+ connector.retryWithHandle(
+ handle -> {
+ Query<Map<String, Object>> query = handle.createQuery(sql)
+ .bind("dataSource",
dataSource);
+ SqlSegmentsMetadataQuery.bindColumnValuesToQueryWithInCondition(
+ "upgraded_from_segment_id",
+ upgradedFromSegmentIdList,
+ query
+ );
+ return query.map((index, r, ctx) -> {
+ final String upgradedToId = r.getString(1);
+ final String id = r.getString(2);
+ upgradedToSegmentIds.computeIfAbsent(id, k -> new HashSet<>())
+ .add(upgradedToId);
+ return null;
+ }).list();
+ }
+ );
+ return upgradedToSegmentIds;
+ }
+
private static class PendingSegmentsRecord
{
private final String sequenceName;
diff --git
a/server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java
b/server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java
index bfbaad18ef1..f117fe7f28b 100644
--- a/server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java
+++ b/server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java
@@ -40,7 +40,10 @@ import java.sql.ResultSet;
* <li> id -> id (Unique identifier for pending segment) <li/>
* <li> sequence_name -> sequenceName (sequence name used for segment
allocation) <li/>
* <li> sequence_prev_id -> sequencePrevId (previous segment id used for
segment allocation) <li/>
- * <li> upgraded_from_segment_id -> upgradedFromSegmentId (Id of the root
segment from which this was upgraded) <li/>
+ * <li> upgraded_from_segment_id -> upgradedFromSegmentId
+ * (ID of the segment which was upgraded to create the current segment.
+ * If the former was itself created as a result of an upgrade, then this ID
+ * must refer to the original non-upgraded segment in the hierarchy.) <li/>
* <li> task_allocator_id -> taskAllocatorId (Associates a task / task group
/ replica group with the pending segment) <li/>
* </ul>
*/
diff --git
a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java
b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java
index 2d315d19fc8..dc87b9fc2fd 100644
--- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java
+++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java
@@ -587,6 +587,8 @@ public abstract class SQLMetadataConnector implements
MetadataStorageConnector
Map<String, String> columnNameTypes = new HashMap<>();
columnNameTypes.put("used_status_last_updated", "VARCHAR(255)");
+ columnNameTypes.put("upgraded_from_segment_id", "VARCHAR(255)");
+
if (centralizedDatasourceSchemaConfig.isEnabled()) {
columnNameTypes.put("schema_fingerprint", "VARCHAR(255)");
columnNameTypes.put("num_rows", "BIGINT");
@@ -619,6 +621,14 @@ public abstract class SQLMetadataConnector implements
MetadataStorageConnector
}
alterTable(tableName, alterCommands);
+
+ final Set<String> createdIndexSet = getIndexOnTable(tableName);
+ createIndex(
+ tableName,
+ StringUtils.format("idx_%1$s_datasource_upgraded_from_segment_id",
tableName),
+ ImmutableList.of("dataSource", "upgraded_from_segment_id"),
+ createdIndexSet
+ );
}
@Override
diff --git
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
index f14cc995050..fc1c84a7037 100644
---
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
+++
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
@@ -286,7 +286,7 @@ public class SqlSegmentsMetadataQuery
if (includeSchemaInfo) {
final Query<Map<String, Object>> query = handle.createQuery(
StringUtils.format(
- "SELECT payload, used, schema_fingerprint, num_rows FROM %s
WHERE dataSource = :dataSource %s",
+ "SELECT payload, used, schema_fingerprint, num_rows,
upgraded_from_segment_id FROM %s WHERE dataSource = :dataSource %s",
dbTables.getSegmentsTable(),
getParameterizedInConditionForColumn("id", segmentIds)
)
);
@@ -306,7 +306,8 @@ public class SqlSegmentsMetadataQuery
null,
r.getBoolean(2),
schemaFingerprint,
- numRows
+ numRows,
+ r.getString(5)
);
}
)
@@ -314,7 +315,7 @@ public class SqlSegmentsMetadataQuery
} else {
final Query<Map<String, Object>> query = handle.createQuery(
StringUtils.format(
- "SELECT payload, used FROM %s WHERE dataSource = :dataSource %s",
+ "SELECT payload, used, upgraded_from_segment_id FROM %s WHERE
dataSource = :dataSource %s",
dbTables.getSegmentsTable(),
getParameterizedInConditionForColumn("id", segmentIds)
)
);
@@ -331,7 +332,8 @@ public class SqlSegmentsMetadataQuery
null,
r.getBoolean(2),
null,
- null
+ null,
+ r.getString(3)
)
)
.iterator();
@@ -864,6 +866,7 @@ public class SqlSegmentsMetadataQuery
DateTimes.of(r.getString(3)),
null,
null,
+ null,
null
))
.iterator();
@@ -980,7 +983,7 @@ public class SqlSegmentsMetadataQuery
*
* @see #getParameterizedInConditionForColumn(String, List)
*/
- private static void bindColumnValuesToQueryWithInCondition(
+ static void bindColumnValuesToQueryWithInCondition(
final String columnName,
final List<String> values,
final SQLStatement<?> query
diff --git
a/server/src/main/java/org/apache/druid/server/http/DataSegmentPlus.java
b/server/src/main/java/org/apache/druid/server/http/DataSegmentPlus.java
index 9841e09a1a7..bfda5cbf3ad 100644
--- a/server/src/main/java/org/apache/druid/server/http/DataSegmentPlus.java
+++ b/server/src/main/java/org/apache/druid/server/http/DataSegmentPlus.java
@@ -36,6 +36,8 @@ import java.util.Objects;
* <li>{@link DataSegmentPlus#createdDate} - The time when the segment was
created.</li>
* <li>{@link DataSegmentPlus#usedStatusLastUpdatedDate} - The time when the
segments
* used status was last updated.</li>
+ * <li>{@link DataSegmentPlus#upgradedFromSegmentId} - The segment id to which
the same load spec originally belonged.
+ * Load specs can be shared as a result of segment version upgrades.</li>
* </ul>
* <p>
* This class closely resembles the row structure of the {@link
MetadataStorageTablesConfig#getSegmentsTable()}.
@@ -53,6 +55,9 @@ public class DataSegmentPlus
private final String schemaFingerprint;
private final Long numRows;
+ @Nullable
+ private final String upgradedFromSegmentId;
+
@JsonCreator
public DataSegmentPlus(
@JsonProperty("dataSegment") final DataSegment dataSegment,
@@ -60,7 +65,8 @@ public class DataSegmentPlus
@JsonProperty("usedStatusLastUpdatedDate") @Nullable final DateTime
usedStatusLastUpdatedDate,
@JsonProperty("used") @Nullable final Boolean used,
@JsonProperty("schemaFingerprint") @Nullable final String
schemaFingerprint,
- @JsonProperty("numRows") @Nullable final Long numRows
+ @JsonProperty("numRows") @Nullable final Long numRows,
+ @JsonProperty("upgradedFromSegmentId") @Nullable final String
upgradedFromSegmentId
)
{
this.dataSegment = dataSegment;
@@ -69,6 +75,7 @@ public class DataSegmentPlus
this.used = used;
this.schemaFingerprint = schemaFingerprint;
this.numRows = numRows;
+ this.upgradedFromSegmentId = upgradedFromSegmentId;
}
@Nullable
@@ -112,6 +119,13 @@ public class DataSegmentPlus
return numRows;
}
+ @Nullable
+ @JsonProperty
+ public String getUpgradedFromSegmentId()
+ {
+ return upgradedFromSegmentId;
+ }
+
@Override
public boolean equals(Object o)
{
@@ -127,7 +141,8 @@ public class DataSegmentPlus
&& Objects.equals(usedStatusLastUpdatedDate,
that.getUsedStatusLastUpdatedDate())
&& Objects.equals(used, that.getUsed())
&& Objects.equals(schemaFingerprint, that.getSchemaFingerprint())
- && Objects.equals(numRows, that.getNumRows());
+ && Objects.equals(numRows, that.getNumRows())
+ && Objects.equals(upgradedFromSegmentId,
that.getUpgradedFromSegmentId());
}
@Override
@@ -139,7 +154,8 @@ public class DataSegmentPlus
usedStatusLastUpdatedDate,
used,
schemaFingerprint,
- numRows
+ numRows,
+ upgradedFromSegmentId
);
}
@@ -153,6 +169,7 @@ public class DataSegmentPlus
", used=" + getUsed() +
", schemaFingerprint=" + getSchemaFingerprint() +
", numRows=" + getNumRows() +
+ ", upgradedFromSegmentId=" + getUpgradedFromSegmentId() +
'}';
}
}
diff --git
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index 222c1ece89f..f352d5e2609 100644
---
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -138,8 +138,10 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
final String v1 = "2023-01-01";
final String v2 = "2023-01-02";
final String v3 = "2023-01-03";
+ final String alreadyUpgradedVersion = "2023-02-01";
final String lockVersion = "2024-01-01";
+ final String taskAllocatorId = "appendTask";
final String replaceTaskId = "replaceTask1";
final ReplaceTaskLock replaceLock = new ReplaceTaskLock(
replaceTaskId,
@@ -148,6 +150,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
);
final Set<DataSegment> appendSegments = new HashSet<>();
+ final List<PendingSegmentRecord> pendingSegmentsForTask = new
ArrayList<>();
final Set<DataSegment> expectedSegmentsToUpgrade = new HashSet<>();
for (int i = 0; i < 10; i++) {
final DataSegment segment = createSegment(
@@ -157,6 +160,31 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
);
appendSegments.add(segment);
expectedSegmentsToUpgrade.add(segment);
+ // Add the same segment
+ pendingSegmentsForTask.add(
+ new PendingSegmentRecord(
+ SegmentIdWithShardSpec.fromDataSegment(segment),
+ v1,
+ segment.getId().toString(),
+ null,
+ taskAllocatorId
+ )
+ );
+ // Add upgraded pending segment
+ pendingSegmentsForTask.add(
+ new PendingSegmentRecord(
+ new SegmentIdWithShardSpec(
+ DS.WIKI,
+ Intervals.of("2023-01-01/2023-02-01"),
+ alreadyUpgradedVersion,
+ new NumberedShardSpec(i, 0)
+ ),
+ alreadyUpgradedVersion,
+ segment.getId().toString(),
+ segment.getId().toString(),
+ taskAllocatorId
+ )
+ );
}
for (int i = 0; i < 10; i++) {
@@ -167,6 +195,31 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
);
appendSegments.add(segment);
expectedSegmentsToUpgrade.add(segment);
+ // Add the same segment
+ pendingSegmentsForTask.add(
+ new PendingSegmentRecord(
+ SegmentIdWithShardSpec.fromDataSegment(segment),
+ v2,
+ segment.getId().toString(),
+ null,
+ taskAllocatorId
+ )
+ );
+ // Add upgraded pending segment
+ pendingSegmentsForTask.add(
+ new PendingSegmentRecord(
+ new SegmentIdWithShardSpec(
+ DS.WIKI,
+ Intervals.of("2023-01-01/2023-02-01"),
+ alreadyUpgradedVersion,
+ new NumberedShardSpec(10 + i, 0)
+ ),
+ alreadyUpgradedVersion,
+ segment.getId().toString(),
+ segment.getId().toString(),
+ taskAllocatorId
+ )
+ );
}
for (int i = 0; i < 10; i++) {
@@ -176,23 +229,78 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
new LinearShardSpec(i)
);
appendSegments.add(segment);
+ // Add the same segment
+ pendingSegmentsForTask.add(
+ new PendingSegmentRecord(
+ SegmentIdWithShardSpec.fromDataSegment(segment),
+ v3,
+ segment.getId().toString(),
+ null,
+ taskAllocatorId
+ )
+ );
+ // Add upgraded pending segment
+ pendingSegmentsForTask.add(
+ new PendingSegmentRecord(
+ new SegmentIdWithShardSpec(
+ DS.WIKI,
+ Intervals.of("2023-01-01/2023-02-01"),
+ alreadyUpgradedVersion,
+ new NumberedShardSpec(20 + i, 0)
+ ),
+ alreadyUpgradedVersion,
+ segment.getId().toString(),
+ segment.getId().toString(),
+ taskAllocatorId
+ )
+ );
}
+ derbyConnector.retryWithHandle(
+ handle -> coordinator.insertPendingSegmentsIntoMetastore(handle,
pendingSegmentsForTask, DS.WIKI, false)
+ );
+
final Map<DataSegment, ReplaceTaskLock> segmentToReplaceLock
= expectedSegmentsToUpgrade.stream()
.collect(Collectors.toMap(s -> s, s ->
replaceLock));
// Commit the segment and verify the results
SegmentPublishResult commitResult
- = coordinator.commitAppendSegments(appendSegments,
segmentToReplaceLock, "append", null);
+ = coordinator.commitAppendSegments(appendSegments,
segmentToReplaceLock, taskAllocatorId, null);
Assert.assertTrue(commitResult.isSuccess());
- Assert.assertEquals(appendSegments, commitResult.getSegments());
- // Verify the segments present in the metadata store
- Assert.assertEquals(
- appendSegments,
-
ImmutableSet.copyOf(retrieveUsedSegments(derbyConnectorRule.metadataTablesConfigSupplier().get()))
+ Set<DataSegment> allCommittedSegments
+ = new
HashSet<>(retrieveUsedSegments(derbyConnectorRule.metadataTablesConfigSupplier().get()));
+ Map<String, String> upgradedFromSegmentIdMap =
coordinator.retrieveUpgradedFromSegmentIds(
+ DS.WIKI,
+
allCommittedSegments.stream().map(DataSegment::getId).map(SegmentId::toString).collect(Collectors.toSet())
);
+ // Verify the segments present in the metadata store
+ Assert.assertTrue(allCommittedSegments.containsAll(appendSegments));
+ for (DataSegment segment : appendSegments) {
+
Assert.assertNull(upgradedFromSegmentIdMap.get(segment.getId().toString()));
+ }
+ allCommittedSegments.removeAll(appendSegments);
+
+ // Verify the commit of upgraded pending segments
+ Assert.assertEquals(appendSegments.size(), allCommittedSegments.size());
+ Map<String, DataSegment> segmentMap = new HashMap<>();
+ for (DataSegment segment : appendSegments) {
+ segmentMap.put(segment.getId().toString(), segment);
+ }
+ for (DataSegment segment : allCommittedSegments) {
+ for (PendingSegmentRecord pendingSegmentRecord : pendingSegmentsForTask)
{
+ if
(pendingSegmentRecord.getId().asSegmentId().toString().equals(segment.getId().toString()))
{
+ DataSegment upgradedFromSegment =
segmentMap.get(pendingSegmentRecord.getUpgradedFromSegmentId());
+ Assert.assertNotNull(upgradedFromSegment);
+ Assert.assertEquals(segment.getLoadSpec(),
upgradedFromSegment.getLoadSpec());
+ Assert.assertEquals(
+ pendingSegmentRecord.getUpgradedFromSegmentId(),
+ upgradedFromSegmentIdMap.get(segment.getId().toString())
+ );
+ }
+ }
+ }
// Verify entries in the segment task lock table
final Set<String> expectedUpgradeSegmentIds
@@ -290,12 +398,24 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
retrieveUsedSegmentIds(derbyConnectorRule.metadataTablesConfigSupplier().get()).size()
);
- final Set<DataSegment> usedSegments = new
HashSet<>(retrieveUsedSegments(derbyConnectorRule.metadataTablesConfigSupplier().get()));
+ final Set<DataSegment> usedSegments
+ = new
HashSet<>(retrieveUsedSegments(derbyConnectorRule.metadataTablesConfigSupplier().get()));
+
+ final Map<String, String> upgradedFromSegmentIdMap =
coordinator.retrieveUpgradedFromSegmentIds(
+ "foo",
+
usedSegments.stream().map(DataSegment::getId).map(SegmentId::toString).collect(Collectors.toSet())
+ );
Assert.assertTrue(usedSegments.containsAll(segmentsAppendedWithReplaceLock));
+ for (DataSegment appendSegment : segmentsAppendedWithReplaceLock) {
+
Assert.assertNull(upgradedFromSegmentIdMap.get(appendSegment.getId().toString()));
+ }
usedSegments.removeAll(segmentsAppendedWithReplaceLock);
Assert.assertTrue(usedSegments.containsAll(replacingSegments));
+ for (DataSegment replaceSegment : replacingSegments) {
+
Assert.assertNull(upgradedFromSegmentIdMap.get(replaceSegment.getId().toString()));
+ }
usedSegments.removeAll(replacingSegments);
Assert.assertEquals(segmentsAppendedWithReplaceLock.size(),
usedSegments.size());
@@ -303,6 +423,10 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
boolean hasBeenCarriedForward = false;
for (DataSegment appendedSegment : segmentsAppendedWithReplaceLock) {
if
(appendedSegment.getLoadSpec().equals(segmentReplicaWithNewVersion.getLoadSpec()))
{
+ Assert.assertEquals(
+ appendedSegment.getId().toString(),
+
upgradedFromSegmentIdMap.get(segmentReplicaWithNewVersion.getId().toString())
+ );
hasBeenCarriedForward = true;
break;
}
@@ -3300,4 +3424,63 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
unusedSegmentIdsForIntervalAndVersion.get(0)
);
}
+
+ @Test
+ public void testRetrieveUpgradedFromSegmentIds()
+ {
+ final String datasource = defaultSegment.getDataSource();
+ final Map<String, String> upgradedFromSegmentIdMap = new HashMap<>();
+ upgradedFromSegmentIdMap.put(defaultSegment2.getId().toString(),
defaultSegment.getId().toString());
+ insertUsedSegments(ImmutableSet.of(defaultSegment, defaultSegment2),
upgradedFromSegmentIdMap);
+ coordinator.markSegmentsAsUnusedWithinInterval(datasource,
Intervals.ETERNITY);
+ upgradedFromSegmentIdMap.clear();
+ upgradedFromSegmentIdMap.put(defaultSegment3.getId().toString(),
defaultSegment.getId().toString());
+ insertUsedSegments(ImmutableSet.of(defaultSegment3, defaultSegment4),
upgradedFromSegmentIdMap);
+
+ Map<String, String> expected = new HashMap<>();
+ expected.put(defaultSegment2.getId().toString(),
defaultSegment.getId().toString());
+ expected.put(defaultSegment3.getId().toString(),
defaultSegment.getId().toString());
+
+ Set<String> segmentIds = new HashSet<>();
+ segmentIds.add(defaultSegment.getId().toString());
+ segmentIds.add(defaultSegment2.getId().toString());
+ segmentIds.add(defaultSegment3.getId().toString());
+ segmentIds.add(defaultSegment4.getId().toString());
+ Assert.assertEquals(
+ expected,
+ coordinator.retrieveUpgradedFromSegmentIds(datasource, segmentIds)
+ );
+ }
+
+ @Test
+ public void testRetrieveUpgradedToSegmentIds()
+ {
+ final String datasource = defaultSegment.getDataSource();
+ final Map<String, String> upgradedFromSegmentIdMap = new HashMap<>();
+ upgradedFromSegmentIdMap.put(defaultSegment2.getId().toString(),
defaultSegment.getId().toString());
+ insertUsedSegments(ImmutableSet.of(defaultSegment, defaultSegment2),
upgradedFromSegmentIdMap);
+ coordinator.markSegmentsAsUnusedWithinInterval(datasource,
Intervals.ETERNITY);
+ upgradedFromSegmentIdMap.clear();
+ upgradedFromSegmentIdMap.put(defaultSegment3.getId().toString(),
defaultSegment.getId().toString());
+ insertUsedSegments(ImmutableSet.of(defaultSegment3, defaultSegment4),
upgradedFromSegmentIdMap);
+
+ Map<String, Set<String>> expected = new HashMap<>();
+ expected.put(defaultSegment.getId().toString(), new HashSet<>());
+
expected.get(defaultSegment.getId().toString()).add(defaultSegment.getId().toString());
+
expected.get(defaultSegment.getId().toString()).add(defaultSegment2.getId().toString());
+
expected.get(defaultSegment.getId().toString()).add(defaultSegment3.getId().toString());
+
+ Set<String> upgradedIds = new HashSet<>();
+ upgradedIds.add(defaultSegment.getId().toString());
+ Assert.assertEquals(
+ expected,
+ coordinator.retrieveUpgradedToSegmentIds(datasource, upgradedIds)
+ );
+ }
+
+ private void insertUsedSegments(Set<DataSegment> segments, Map<String,
String> upgradedFromSegmentIdMap)
+ {
+ final String table =
derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable();
+ insertUsedSegments(segments, upgradedFromSegmentIdMap, derbyConnector,
table, mapper);
+ }
}
diff --git
a/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorTestBase.java
b/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorTestBase.java
index ce0e0686058..2076e5ffa46 100644
---
a/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorTestBase.java
+++
b/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorTestBase.java
@@ -58,6 +58,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
@@ -322,6 +323,8 @@ public class IndexerSqlMetadataStorageCoordinatorTestBase
.version(version)
.shardSpec(shardSpec)
.size(100)
+ // hash to get a unique load spec as segmentId has not
yet been generated
+ .loadSpec(ImmutableMap.of("hash", Objects.hash(interval,
version, shardSpec)))
.build();
}
@@ -559,4 +562,50 @@ public class IndexerSqlMetadataStorageCoordinatorTestBase
}
);
}
+
+ public static void insertUsedSegments(
+ Set<DataSegment> dataSegments,
+ Map<String, String> upgradedFromSegmentIdMap,
+ SQLMetadataConnector connector,
+ String table,
+ ObjectMapper jsonMapper
+ )
+ {
+ connector.retryWithHandle(
+ handle -> {
+ PreparedBatch preparedBatch = handle.prepareBatch(
+ StringUtils.format(
+ "INSERT INTO %1$s (id, dataSource, created_date, start,
%2$send%2$s, partitioned, version,"
+ + " used, payload, used_status_last_updated,
upgraded_from_segment_id) "
+ + "VALUES (:id, :dataSource, :created_date, :start, :end,
:partitioned, :version,"
+ + " :used, :payload, :used_status_last_updated,
:upgraded_from_segment_id)",
+ table,
+ connector.getQuoteString()
+ )
+ );
+ for (DataSegment segment : dataSegments) {
+ String id = segment.getId().toString();
+ preparedBatch.add()
+ .bind("id", id)
+ .bind("dataSource", segment.getDataSource())
+ .bind("created_date", DateTimes.nowUtc().toString())
+ .bind("start",
segment.getInterval().getStart().toString())
+ .bind("end",
segment.getInterval().getEnd().toString())
+ .bind("partitioned", !(segment.getShardSpec()
instanceof NoneShardSpec))
+ .bind("version", segment.getVersion())
+ .bind("used", true)
+ .bind("payload",
jsonMapper.writeValueAsBytes(segment))
+ .bind("used_status_last_updated",
DateTimes.nowUtc().toString())
+ .bind("upgraded_from_segment_id",
upgradedFromSegmentIdMap.get(segment.getId().toString()));
+ }
+
+ final int[] affectedRows = preparedBatch.execute();
+ final boolean succeeded =
Arrays.stream(affectedRows).allMatch(eachAffectedRows -> eachAffectedRows == 1);
+ if (!succeeded) {
+ throw new ISE("Failed to publish segments to DB");
+ }
+ return true;
+ }
+ );
+ }
}
diff --git
a/server/src/test/java/org/apache/druid/server/http/DataSegmentPlusTest.java
b/server/src/test/java/org/apache/druid/server/http/DataSegmentPlusTest.java
index 0f20fc96bdc..b963f433708 100644
--- a/server/src/test/java/org/apache/druid/server/http/DataSegmentPlusTest.java
+++ b/server/src/test/java/org/apache/druid/server/http/DataSegmentPlusTest.java
@@ -100,6 +100,7 @@ public class DataSegmentPlusTest
usedStatusLastUpdatedDate,
null,
null,
+ null,
null
);
@@ -108,7 +109,7 @@ public class DataSegmentPlusTest
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
);
- Assert.assertEquals(6, objectMap.size());
+ Assert.assertEquals(7, objectMap.size());
final Map<String, Object> segmentObjectMap = MAPPER.readValue(
MAPPER.writeValueAsString(segmentPlus.getDataSegment()),
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
diff --git
a/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java
b/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java
index 4d6bbf5929b..9c52d639300 100644
---
a/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java
+++
b/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java
@@ -77,7 +77,7 @@ public class MetadataResourceTest
.toArray(new DataSegment[0]);
private final List<DataSegmentPlus> segmentsPlus = Arrays.stream(segments)
- .map(s -> new DataSegmentPlus(s, DateTimes.nowUtc(),
DateTimes.nowUtc(), null, null, null))
+ .map(s -> new DataSegmentPlus(s, DateTimes.nowUtc(),
DateTimes.nowUtc(), null, null, null, null))
.collect(Collectors.toList());
private HttpServletRequest request;
private SegmentsMetadataManager segmentsMetadataManager;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]