abhishekrb19 commented on code in PR #16667:
URL: https://github.com/apache/druid/pull/16667#discussion_r1659493204
##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -2923,6 +2956,130 @@ public int deleteUpgradeSegmentsForTask(final String
taskId)
);
}
+ @Override
+ public Set<DataSegment> findSegmentsWithUnreferencedLoadSpecs(final
Set<DataSegment> segments)
+ {
+ if (segments.isEmpty()) {
+ return segments;
+ }
+ final String dataSource = segments.stream()
+ .findFirst()
+ .get()
+ .getDataSource();
+ final List<String> segmentIds = segments.stream()
+ .map(DataSegment::getId)
+ .map(SegmentId::toString)
+ .collect(Collectors.toList());
+ Map<String, String> idToRootMap = new
HashMap<>(getRootSegmentIds(dataSource, segmentIds));
+ final Set<String> rootSegmentIds = new HashSet<>(segmentIds);
+ rootSegmentIds.addAll(idToRootMap.values());
+ idToRootMap.putAll(getSegmentsWithRootSegmentIds(dataSource, new
ArrayList<>(rootSegmentIds)));
+ final Map<String, Set<String>> rootToIdsMap = new HashMap<>();
+ for (Map.Entry<String, String> idAndRoot : idToRootMap.entrySet()) {
+ rootToIdsMap.computeIfAbsent(idAndRoot.getValue(), root -> new
HashSet<>())
+ .add(idAndRoot.getKey());
+ }
+ final Set<DataSegment> segmentsWithUnreferencedLoadSpecs = new HashSet<>();
+ final Set<String> existingSegmentIds = retrieveSegmentsById(dataSource,
rootSegmentIds).stream()
+
.map(DataSegment::getId)
+
.map(SegmentId::toString)
+
.collect(Collectors.toSet());
+ for (DataSegment segment : segments) {
+ final String id = segment.getId().toString();
+ final Set<String> conflicts = new HashSet<>();
+ if (rootToIdsMap.containsKey(id)) {
+ // Add children
+ conflicts.addAll(rootToIdsMap.get(id));
+ }
+ final String parent = idToRootMap.get(id);
+ if (parent != null) {
+ // add parent if it exists
+ if (existingSegmentIds.contains(parent)) {
+ conflicts.add(parent);
+ }
+ // add siblings
+ if (rootToIdsMap.containsKey(parent)) {
+ conflicts.addAll(rootToIdsMap.get(parent));
+ }
+ }
+ // Remove segments being deleted
+ segmentIds.forEach(conflicts::remove);
+ if (conflicts.isEmpty()) {
+ segmentsWithUnreferencedLoadSpecs.add(segment);
+ }
+ }
+ return segmentsWithUnreferencedLoadSpecs;
+ }
+
+ @VisibleForTesting
+ Map<String, String> getRootSegmentIds(final String dataSource, final
List<String> segmentIds)
Review Comment:
A javadoc here will be nice
##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -2923,6 +2956,130 @@ public int deleteUpgradeSegmentsForTask(final String
taskId)
);
}
+ @Override
+ public Set<DataSegment> findSegmentsWithUnreferencedLoadSpecs(final
Set<DataSegment> segments)
+ {
+ if (segments.isEmpty()) {
+ return segments;
+ }
+ final String dataSource = segments.stream()
Review Comment:
Since we don't validate that segments don't belong to different data sources
(like some methods do), we can at least call that out in the interface method
##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -587,6 +587,11 @@ protected void alterSegmentTable()
Map<String, String> columnNameTypes = new HashMap<>();
columnNameTypes.put("used_status_last_updated", "VARCHAR(255)");
+ // root_segment_id is the segment id is the first / root segment to which
the same load spec originally belonged
Review Comment:
typo here
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentNukeAction.java:
##########
@@ -54,26 +56,33 @@ public Set<DataSegment> getSegments()
}
@Override
- public TypeReference<Void> getReturnTypeReference()
+ public TypeReference<Set<DataSegment>> getReturnTypeReference()
{
- return new TypeReference<Void>()
+ return new TypeReference<Set<DataSegment>>()
{
};
}
@Override
- public Void perform(Task task, TaskActionToolbox toolbox)
+ public Set<DataSegment> perform(Task task, TaskActionToolbox toolbox)
{
TaskLocks.checkLockCoversSegments(task, toolbox.getTaskLockbox(),
segments);
-
+ final Set<DataSegment> segmentsToKill = new HashSet<>();
try {
toolbox.getTaskLockbox().doInCriticalSection(
task,
segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet()),
CriticalAction.builder()
.onValidLocks(
() -> {
-
toolbox.getIndexerMetadataStorageCoordinator().deleteSegments(segments);
+ final IndexerMetadataStorageCoordinator
coordinator
+ =
toolbox.getIndexerMetadataStorageCoordinator();
+
+ // Find subset of segments with unreferenced
load specs assuming successful nuking
+
segmentsToKill.addAll(coordinator.findSegmentsWithUnreferencedLoadSpecs(segments));
+
Review Comment:
Design wise, it seems strange that the nuke task action is doing this.
Shouldn't the kill task be responsible for determining this set or perhaps a
separate task action?
If there's a solid rationale for doing this, can you please update the
javadocs so someone reading the code isn't caught by surprise.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java:
##########
@@ -237,14 +237,26 @@ public TaskStatus runTask(TaskToolbox toolbox) throws
Exception
// the metadata segment is present. If the segment nuke throws an
exception, then the segment cleanup is
// abandoned.
- toolbox.getTaskActionClient().submit(new SegmentNukeAction(new
HashSet<>(unusedSegments)));
-
- // Kill segments from the deep storage only if their load specs are not
being used by any used segments
- final List<DataSegment> segmentsToBeKilled = unusedSegments
- .stream()
- .filter(unusedSegment -> unusedSegment.getLoadSpec() == null
- ||
!usedSegmentLoadSpecs.contains(unusedSegment.getLoadSpec()))
- .collect(Collectors.toList());
+ final List<DataSegment> segmentsToBeKilled;
+
+ Set<DataSegment> segmentsWithUnreferencedLoadSpecs
+ = toolbox.getTaskActionClient().submit(new SegmentNukeAction(new
HashSet<>(unusedSegments)));
Review Comment:
Please see my comment above. I would suggest adding a new task action for
this instead of piggybacking on the functionality of `SegmentNukeAction`, which
should only nuke segments IMO.
##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -1348,6 +1353,21 @@ public int hashCode()
}
}
+ private static void bindColumnValuesToQueryWithInCondition(
Review Comment:
This looks identical to the private static method:
`SqlSegmentsMetadataQuery#bindColumnValuesToQueryWithInCondition`. Can we reuse
that method by making it package-private and removing this duplicate method?
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentNukeAction.java:
##########
@@ -54,26 +56,33 @@ public Set<DataSegment> getSegments()
}
@Override
- public TypeReference<Void> getReturnTypeReference()
+ public TypeReference<Set<DataSegment>> getReturnTypeReference()
{
- return new TypeReference<Void>()
+ return new TypeReference<Set<DataSegment>>()
{
};
}
@Override
- public Void perform(Task task, TaskActionToolbox toolbox)
+ public Set<DataSegment> perform(Task task, TaskActionToolbox toolbox)
Review Comment:
Also, we are changing the semantics of an existing task action? Will this be
a breaking change?
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentNukeAction.java:
##########
@@ -54,26 +56,33 @@ public Set<DataSegment> getSegments()
}
@Override
- public TypeReference<Void> getReturnTypeReference()
+ public TypeReference<Set<DataSegment>> getReturnTypeReference()
{
- return new TypeReference<Void>()
+ return new TypeReference<Set<DataSegment>>()
{
};
}
@Override
- public Void perform(Task task, TaskActionToolbox toolbox)
+ public Set<DataSegment> perform(Task task, TaskActionToolbox toolbox)
{
TaskLocks.checkLockCoversSegments(task, toolbox.getTaskLockbox(),
segments);
-
+ final Set<DataSegment> segmentsToKill = new HashSet<>();
try {
toolbox.getTaskLockbox().doInCriticalSection(
task,
segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet()),
CriticalAction.builder()
.onValidLocks(
() -> {
-
toolbox.getIndexerMetadataStorageCoordinator().deleteSegments(segments);
+ final IndexerMetadataStorageCoordinator
coordinator
+ =
toolbox.getIndexerMetadataStorageCoordinator();
+
+ // Find subset of segments with unreferenced
load specs assuming successful nuking
+
segmentsToKill.addAll(coordinator.findSegmentsWithUnreferencedLoadSpecs(segments));
+
Review Comment:
Design wise, it seems strange that the nuke task action is doing this.
Shouldn't the kill task be responsible for determining this set or perhaps a
separate task action?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]