kfaraz commented on code in PR #16667: URL: https://github.com/apache/druid/pull/16667#discussion_r1673823924
########## indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpgradedToSegmentsResponse.java: ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.actions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Map; +import java.util.Set; + +/** + * Response for the RetrieveUpgradedToSegmentIds task action + */ Review Comment: Nit: not needed, the name explains it. ########## indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java: ########## @@ -300,6 +356,81 @@ private NavigableMap<DateTime, List<TaskLock>> getNonRevokedTaskLockMap(TaskActi return taskLockMap; } + private List<DataSegment> findUnreferencedSegments( + List<DataSegment> unusedSegments, + Map<String, String> upgradedFromSegmentIds, + TaskActionClient taskActionClient + ) throws IOException + { + if (unusedSegments.isEmpty() || upgradedFromSegmentIds.isEmpty()) { + return unusedSegments; + } + + final Set<String> upgradedSegmentIds = new HashSet<>(); Review Comment: ```suggestion final Set<String> segmentIdsToCheckForUsage = new HashSet<>(); ``` ########## indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java: ########## @@ -66,6 +72,20 @@ * The client representation of this task is {@link ClientKillUnusedSegmentsTaskQuery}. * JSON serialization fields of this class must correspond to those of {@link * ClientKillUnusedSegmentsTaskQuery}, except for {@link #id} and {@link #context} fields. + * <br/> + * <br/> + * The Kill task fetches the set of used segments for the interval and computes the set of their load specs. <br/> + * Until `limit` segments have been processed in total or all segments for the interval have been nuked: <br/> Review Comment: Line break not needed here as lists start in a new line anyway. ```suggestion * Until `limit` segments have been processed in total or all segments for the interval have been nuked: ``` ########## indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java: ########## @@ -300,6 +356,81 @@ private NavigableMap<DateTime, List<TaskLock>> getNonRevokedTaskLockMap(TaskActi return taskLockMap; } + private List<DataSegment> findUnreferencedSegments( + List<DataSegment> unusedSegments, + Map<String, String> upgradedFromSegmentIds, + TaskActionClient taskActionClient + ) throws IOException + { + if (unusedSegments.isEmpty() || upgradedFromSegmentIds.isEmpty()) { + return unusedSegments; + } + + final Set<String> upgradedSegmentIds = new HashSet<>(); + for (DataSegment segment : unusedSegments) { + final String id = segment.getId().toString(); + upgradedSegmentIds.add(id); + if (upgradedFromSegmentIds.containsKey(id)) { + upgradedSegmentIds.add(upgradedFromSegmentIds.get(id)); + } + } + + final Map<String, Set<String>> upgradedToSegmentIds; + try { + upgradedToSegmentIds = taskActionClient.submit( + new RetrieveUpgradedToSegmentIdsAction(getDataSource(), upgradedSegmentIds) + ).getUpgradedToSegmentIds(); + } + catch (Exception e) { + LOG.warn( + e, + "Could not retrieve UpgradedToSegmentsResponse using task action[retrieveUpgradedToSegmentIds]." + + " Overlord may be on an older version." + ); + return unusedSegments; + } + + final Set<String> existingSegmentIds + = taskActionClient.submit(new RetrieveSegmentsByIdAction(getDataSource(), upgradedSegmentIds)) + .stream() + .map(DataSegment::getId) + .map(SegmentId::toString) + .collect(Collectors.toSet()); + + List<DataSegment> unreferencedSegments = new ArrayList<>(); Review Comment: ```suggestion final List<DataSegment> segmentsToKill = new ArrayList<>(); ``` ########## indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java: ########## @@ -231,29 +252,64 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception ); } - // Kill segments - // Order is important here: we want the nuke action to clean up the metadata records _before_ the - // segments are removed from storage, this helps maintain that we will always have a storage segment if - // the metadata segment is present. If the segment nuke throws an exception, then the segment cleanup is - // abandoned. + // Kill segments. Order is important here: + // Retrieve the segment upgrade infos for the batch _before_ the segments are nuked + // We then want the nuke action to clean up the metadata records _before_ the segments are removed from storage. + // This helps maintain that we will always have a storage segment if the metadata segment is present. + // Determine the subset of segments to be killed from deep storage based on loadspecs. + // If the segment nuke throws an exception, then the segment cleanup is abandoned. + + // Determine upgraded segment ids before nuking + final Set<String> segmentIds = unusedSegments.stream() + .map(DataSegment::getId) + .map(SegmentId::toString) + .collect(Collectors.toSet()); + final Map<String, String> upgradedFromSegmentIds = new HashMap<>(); + try { + upgradedFromSegmentIds.putAll( + taskActionClient.submit( + new RetrieveUpgradedFromSegmentIdsAction(getDataSource(), segmentIds) + ).getUpgradedFromSegmentIds() + ); + } + catch (Exception e) { + LOG.warn( + e, + "Could not retrieve UpgradedFromSegmentsResponse using task action[retrieveUpgradedFromSegmentIds]." + + " Overlord may be on an older version." + ); + } + + // Nuke Segments + taskActionClient.submit(new SegmentNukeAction(new HashSet<>(unusedSegments))); - toolbox.getTaskActionClient().submit(new SegmentNukeAction(new HashSet<>(unusedSegments))); + // Determine unreferenced segments + final List<DataSegment> unreferencedSegments + = findUnreferencedSegments(unusedSegments, upgradedFromSegmentIds, taskActionClient); - // Kill segments from the deep storage only if their load specs are not being used by any used segments - final List<DataSegment> segmentsToBeKilled = unusedSegments + // Kill segments from the deep storage only if their load specs are not being used by any other segments + // We still need to check with used load specs as segment upgrades were introduced before this change + final List<DataSegment> segmentsToBeKilled = unreferencedSegments .stream() .filter(unusedSegment -> unusedSegment.getLoadSpec() == null || !usedSegmentLoadSpecs.contains(unusedSegment.getLoadSpec())) .collect(Collectors.toList()); Review Comment: This logic should also move to the new method. ########## indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java: ########## @@ -66,6 +72,20 @@ * The client representation of this task is {@link ClientKillUnusedSegmentsTaskQuery}. * JSON serialization fields of this class must correspond to those of {@link * ClientKillUnusedSegmentsTaskQuery}, except for {@link #id} and {@link #context} fields. + * <br/> Review Comment: Thanks for the javadoc. Tip, you can use `<p>` to separate paragraphs. It looks nicer. ########## indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java: ########## @@ -300,6 +356,81 @@ private NavigableMap<DateTime, List<TaskLock>> getNonRevokedTaskLockMap(TaskActi return taskLockMap; } + private List<DataSegment> findUnreferencedSegments( + List<DataSegment> unusedSegments, + Map<String, String> upgradedFromSegmentIds, + TaskActionClient taskActionClient + ) throws IOException + { + if (unusedSegments.isEmpty() || upgradedFromSegmentIds.isEmpty()) { + return unusedSegments; + } + + final Set<String> upgradedSegmentIds = new HashSet<>(); + for (DataSegment segment : unusedSegments) { + final String id = segment.getId().toString(); + upgradedSegmentIds.add(id); + if (upgradedFromSegmentIds.containsKey(id)) { + upgradedSegmentIds.add(upgradedFromSegmentIds.get(id)); + } + } + + final Map<String, Set<String>> upgradedToSegmentIds; + try { + upgradedToSegmentIds = taskActionClient.submit( + new RetrieveUpgradedToSegmentIdsAction(getDataSource(), upgradedSegmentIds) + ).getUpgradedToSegmentIds(); + } + catch (Exception e) { + LOG.warn( + e, + "Could not retrieve UpgradedToSegmentsResponse using task action[retrieveUpgradedToSegmentIds]." + + " Overlord may be on an older version." + ); + return unusedSegments; + } + + final Set<String> existingSegmentIds + = taskActionClient.submit(new RetrieveSegmentsByIdAction(getDataSource(), upgradedSegmentIds)) + .stream() + .map(DataSegment::getId) + .map(SegmentId::toString) + .collect(Collectors.toSet()); + + List<DataSegment> unreferencedSegments = new ArrayList<>(); + for (DataSegment segment : unusedSegments) { + final String id = segment.getId().toString(); + + // If the segment still exists for some reason, do not kill it + if (existingSegmentIds.contains(id)) { + break; + } + + // If the segment is the parent of existing segments, do not kill it + if (upgradedToSegmentIds.containsKey(id)) { Review Comment: `upgradedToSegmentIds` can be `null` here. ########## indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java: ########## @@ -300,6 +356,81 @@ private NavigableMap<DateTime, List<TaskLock>> getNonRevokedTaskLockMap(TaskActi return taskLockMap; } + private List<DataSegment> findUnreferencedSegments( + List<DataSegment> unusedSegments, + Map<String, String> upgradedFromSegmentIds, + TaskActionClient taskActionClient + ) throws IOException + { + if (unusedSegments.isEmpty() || upgradedFromSegmentIds.isEmpty()) { + return unusedSegments; + } + + final Set<String> upgradedSegmentIds = new HashSet<>(); + for (DataSegment segment : unusedSegments) { + final String id = segment.getId().toString(); + upgradedSegmentIds.add(id); + if (upgradedFromSegmentIds.containsKey(id)) { + upgradedSegmentIds.add(upgradedFromSegmentIds.get(id)); + } Review Comment: ```suggestion if (upgradedFromSegmentIds.containsKey(id)) { upgradedSegmentIds.add(upgradedFromSegmentIds.get(id)); } else { // Check for the segment ID itself if it was not upgraded from some other segment upgradedSegmentIds.add(id); } ``` ########## indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java: ########## @@ -231,29 +252,64 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception ); } - // Kill segments - // Order is important here: we want the nuke action to clean up the metadata records _before_ the - // segments are removed from storage, this helps maintain that we will always have a storage segment if - // the metadata segment is present. If the segment nuke throws an exception, then the segment cleanup is - // abandoned. + // Kill segments. Order is important here: + // Retrieve the segment upgrade infos for the batch _before_ the segments are nuked + // We then want the nuke action to clean up the metadata records _before_ the segments are removed from storage. + // This helps maintain that we will always have a storage segment if the metadata segment is present. + // Determine the subset of segments to be killed from deep storage based on loadspecs. + // If the segment nuke throws an exception, then the segment cleanup is abandoned. + + // Determine upgraded segment ids before nuking + final Set<String> segmentIds = unusedSegments.stream() + .map(DataSegment::getId) + .map(SegmentId::toString) + .collect(Collectors.toSet()); + final Map<String, String> upgradedFromSegmentIds = new HashMap<>(); + try { + upgradedFromSegmentIds.putAll( + taskActionClient.submit( + new RetrieveUpgradedFromSegmentIdsAction(getDataSource(), segmentIds) + ).getUpgradedFromSegmentIds() + ); + } + catch (Exception e) { + LOG.warn( + e, + "Could not retrieve UpgradedFromSegmentsResponse using task action[retrieveUpgradedFromSegmentIds]." Review Comment: ```suggestion "Could not retrieve parent segment IDs using task action[retrieveUpgradedFromSegmentIds]." ``` ########## indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpgradedFromSegmentsResponse.java: ########## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.actions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Map; + +/** + * Response for the RetrieveUpgradedFromSegmentIds task action + */ Review Comment: Nit: not really needed. ########## indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java: ########## @@ -300,6 +356,81 @@ private NavigableMap<DateTime, List<TaskLock>> getNonRevokedTaskLockMap(TaskActi return taskLockMap; } + private List<DataSegment> findUnreferencedSegments( + List<DataSegment> unusedSegments, + Map<String, String> upgradedFromSegmentIds, + TaskActionClient taskActionClient + ) throws IOException + { + if (unusedSegments.isEmpty() || upgradedFromSegmentIds.isEmpty()) { + return unusedSegments; + } + + final Set<String> upgradedSegmentIds = new HashSet<>(); + for (DataSegment segment : unusedSegments) { + final String id = segment.getId().toString(); + upgradedSegmentIds.add(id); + if (upgradedFromSegmentIds.containsKey(id)) { + upgradedSegmentIds.add(upgradedFromSegmentIds.get(id)); + } + } + + final Map<String, Set<String>> upgradedToSegmentIds; + try { + upgradedToSegmentIds = taskActionClient.submit( + new RetrieveUpgradedToSegmentIdsAction(getDataSource(), upgradedSegmentIds) + ).getUpgradedToSegmentIds(); + } + catch (Exception e) { + LOG.warn( + e, + "Could not retrieve UpgradedToSegmentsResponse using task action[retrieveUpgradedToSegmentIds]." + + " Overlord may be on an older version." + ); + return unusedSegments; + } + + final Set<String> existingSegmentIds + = taskActionClient.submit(new RetrieveSegmentsByIdAction(getDataSource(), upgradedSegmentIds)) Review Comment: Why is this action needed? Doesn't the new `UpgradedToSegmentIdsResponse` already cover this case? See the comment here: https://github.com/apache/druid/pull/16667#issuecomment-2205060939 ########## indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java: ########## @@ -300,6 +356,81 @@ private NavigableMap<DateTime, List<TaskLock>> getNonRevokedTaskLockMap(TaskActi return taskLockMap; } + private List<DataSegment> findUnreferencedSegments( + List<DataSegment> unusedSegments, + Map<String, String> upgradedFromSegmentIds, + TaskActionClient taskActionClient + ) throws IOException + { + if (unusedSegments.isEmpty() || upgradedFromSegmentIds.isEmpty()) { + return unusedSegments; + } + + final Set<String> upgradedSegmentIds = new HashSet<>(); + for (DataSegment segment : unusedSegments) { + final String id = segment.getId().toString(); + upgradedSegmentIds.add(id); + if (upgradedFromSegmentIds.containsKey(id)) { + upgradedSegmentIds.add(upgradedFromSegmentIds.get(id)); + } + } + + final Map<String, Set<String>> upgradedToSegmentIds; + try { + upgradedToSegmentIds = taskActionClient.submit( + new RetrieveUpgradedToSegmentIdsAction(getDataSource(), upgradedSegmentIds) + ).getUpgradedToSegmentIds(); + } + catch (Exception e) { + LOG.warn( + e, + "Could not retrieve UpgradedToSegmentsResponse using task action[retrieveUpgradedToSegmentIds]." + + " Overlord may be on an older version." + ); + return unusedSegments; + } + + final Set<String> existingSegmentIds + = taskActionClient.submit(new RetrieveSegmentsByIdAction(getDataSource(), upgradedSegmentIds)) + .stream() + .map(DataSegment::getId) + .map(SegmentId::toString) + .collect(Collectors.toSet()); + + List<DataSegment> unreferencedSegments = new ArrayList<>(); + for (DataSegment segment : unusedSegments) { + final String id = segment.getId().toString(); + + // If the segment still exists for some reason, do not kill it + if (existingSegmentIds.contains(id)) { + break; Review Comment: did you want to perform `break` here or `continue`? ########## indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java: ########## @@ -300,6 +356,81 @@ private NavigableMap<DateTime, List<TaskLock>> getNonRevokedTaskLockMap(TaskActi return taskLockMap; } + private List<DataSegment> findUnreferencedSegments( Review Comment: ```suggestion private List<DataSegment> getKillableSegments( ``` ########## indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java: ########## @@ -300,6 +356,81 @@ private NavigableMap<DateTime, List<TaskLock>> getNonRevokedTaskLockMap(TaskActi return taskLockMap; } + private List<DataSegment> findUnreferencedSegments( + List<DataSegment> unusedSegments, + Map<String, String> upgradedFromSegmentIds, + TaskActionClient taskActionClient + ) throws IOException + { + if (unusedSegments.isEmpty() || upgradedFromSegmentIds.isEmpty()) { + return unusedSegments; + } + + final Set<String> upgradedSegmentIds = new HashSet<>(); + for (DataSegment segment : unusedSegments) { + final String id = segment.getId().toString(); + upgradedSegmentIds.add(id); + if (upgradedFromSegmentIds.containsKey(id)) { + upgradedSegmentIds.add(upgradedFromSegmentIds.get(id)); + } + } + + final Map<String, Set<String>> upgradedToSegmentIds; + try { + upgradedToSegmentIds = taskActionClient.submit( + new RetrieveUpgradedToSegmentIdsAction(getDataSource(), upgradedSegmentIds) + ).getUpgradedToSegmentIds(); + } + catch (Exception e) { + LOG.warn( + e, + "Could not retrieve UpgradedToSegmentsResponse using task action[retrieveUpgradedToSegmentIds]." + + " Overlord may be on an older version." + ); + return unusedSegments; + } + + final Set<String> existingSegmentIds + = taskActionClient.submit(new RetrieveSegmentsByIdAction(getDataSource(), upgradedSegmentIds)) + .stream() + .map(DataSegment::getId) + .map(SegmentId::toString) + .collect(Collectors.toSet()); + + List<DataSegment> unreferencedSegments = new ArrayList<>(); + for (DataSegment segment : unusedSegments) { Review Comment: This loop and the whole method in general has become too complicated. The method needs to do the following instead: ```java private Set<DataSegment> findKillableSegments(...) { // Determine parentId for each unused segment final Map<String, DataSegment> parentIdToUnusedSegment = new HashMap<>(); for (DataSegment segment : unusedSegments) { final String segmentId = segment.getId().toString(); if (upgradedFromSegmentIds.containsKey(segmentId) { parentIdToUnusedSegment.put(upgradedFromSegmentIds.get(segmentId), segment); } else { parentIdToUnusedSegment.put(segmentId, segment); } } // Check if the parent or any of its children exist in metadata store try { UpgradedToSegmentIdsResponse response = taskActionClient.submit( new RetrieveUpgradedToSegmentIdsAction(getDataSource(), parentIdToUnusedSegment.keySet()) ).getUpgradedToSegmentIds(); if (response != null && response.getUpgradedToSegmentIds() != null) { response.getUpgradedToSegmentIds().forEach((parent, children) -> { if (!CollectionUtils.isNullOrEmpty(children)) { // Do not kill segment if its parent or any of its siblings still exist in metadata store parentIdToUnusedSegment.remove(parent); } }); } } catch (Exception e) { LOG.warn( e, "Could not retrieve UpgradedToSegmentsResponse using task action[retrieveUpgradedToSegmentIds]." + " Overlord may be on an older version." ); // TODO: filter out based on used segment load specs return parentIdToUnusedSegment.values(); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
