kfaraz commented on code in PR #16667:
URL: https://github.com/apache/druid/pull/16667#discussion_r1677293983


##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -2923,6 +2944,87 @@ public int deleteUpgradeSegmentsForTask(final String 
taskId)
     );
   }
 
+  @Override
+  public Map<String, String> retrieveUpgradedFromSegmentIds(
+      final String dataSource,
+      final Set<String> segmentIds
+  )
+  {
+    if (segmentIds.isEmpty()) {
+      return Collections.emptyMap();
+    }
+
+    final List<String> segmentIdList = ImmutableList.copyOf(segmentIds);
+    final String sql = StringUtils.format(
+        "SELECT id, upgraded_from_segment_id FROM %s WHERE dataSource = 
:dataSource %s",
+        dbTables.getSegmentsTable(),
+        SqlSegmentsMetadataQuery.getParameterizedInConditionForColumn("id", 
segmentIdList)
+    );
+    final Map<String, String> upgradedFromSegmentIds = new HashMap<>();
+    connector.retryWithHandle(
+        handle -> {
+          Query<Map<String, Object>> query = handle.createQuery(sql)

Review Comment:
   Follow up work:
   This query should probably be executed on batches of 100 segment IDs at a 
time.



##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -2923,6 +2944,87 @@ public int deleteUpgradeSegmentsForTask(final String 
taskId)
     );
   }
 
+  @Override
+  public Map<String, String> retrieveUpgradedFromSegmentIds(
+      final String dataSource,
+      final Set<String> segmentIds
+  )
+  {
+    if (segmentIds.isEmpty()) {
+      return Collections.emptyMap();
+    }
+
+    final List<String> segmentIdList = ImmutableList.copyOf(segmentIds);
+    final String sql = StringUtils.format(
+        "SELECT id, upgraded_from_segment_id FROM %s WHERE dataSource = 
:dataSource %s",
+        dbTables.getSegmentsTable(),
+        SqlSegmentsMetadataQuery.getParameterizedInConditionForColumn("id", 
segmentIdList)
+    );
+    final Map<String, String> upgradedFromSegmentIds = new HashMap<>();
+    connector.retryWithHandle(
+        handle -> {
+          Query<Map<String, Object>> query = handle.createQuery(sql)
+                                                   .bind("dataSource", 
dataSource);
+          
SqlSegmentsMetadataQuery.bindColumnValuesToQueryWithInCondition("id", 
segmentIdList, query);
+          return query.map((index, r, ctx) -> {
+            final String id = r.getString(1);
+            final String upgradedFromSegmentId = r.getString(2);
+            if (upgradedFromSegmentId != null) {
+              upgradedFromSegmentIds.put(id, upgradedFromSegmentId);
+            }
+            return null;
+          }).list();
+        }
+    );
+    return upgradedFromSegmentIds;
+  }
+
+  @Override
+  public Map<String, Set<String>> retrieveUpgradedToSegmentIds(
+      final String dataSource,
+      final Set<String> segmentIds
+  )
+  {
+    if (segmentIds.isEmpty()) {
+      return Collections.emptyMap();
+    }
+
+    final List<String> upgradedFromSegmentIdList = 
ImmutableList.copyOf(segmentIds);
+    final String sql = StringUtils.format(
+        "SELECT id, upgraded_from_segment_id FROM %s WHERE dataSource = 
:dataSource %s",
+        dbTables.getSegmentsTable(),
+        SqlSegmentsMetadataQuery.getParameterizedInConditionForColumn(
+            "upgraded_from_segment_id",
+            upgradedFromSegmentIdList
+        )
+    );
+    final Map<String, Set<String>> upgradedToSegmentIds = new HashMap<>();
+    retrieveSegmentsById(dataSource, segmentIds)
+        .stream()
+        .map(DataSegment::getId)
+        .map(SegmentId::toString)
+        .forEach(id -> upgradedToSegmentIds.computeIfAbsent(id, k -> new 
HashSet<>()).add(id));
+    connector.retryWithHandle(
+        handle -> {
+          Query<Map<String, Object>> query = handle.createQuery(sql)

Review Comment:
   Follow up work:
   This one should probably be broken up in batches too.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java:
##########
@@ -300,6 +348,55 @@ private NavigableMap<DateTime, List<TaskLock>> 
getNonRevokedTaskLockMap(TaskActi
     return taskLockMap;
   }
 
+  private List<DataSegment> getKillableSegments(

Review Comment:
   Follow up work:
   Please add a small javadoc here.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java:
##########
@@ -63,9 +69,23 @@
 import java.util.stream.Collectors;
 
 /**
+ * <p/>
  * The client representation of this task is {@link 
ClientKillUnusedSegmentsTaskQuery}.
  * JSON serialization fields of this class must correspond to those of {@link
  * ClientKillUnusedSegmentsTaskQuery}, except for {@link #id} and {@link 
#context} fields.
+ * <p/>
+ * The Kill task fetches the set of used segments for the interval and 
computes the set of their load specs. <br/>
+ * Until `limit` segments have been processed in total or all segments for the 
interval have been nuked:
+ * <ol>
+ * <li> Fetch at most `batchSize` unused segments from the metadata store. 
</li>
+ * <li> Determine the mapping from these segments to their parents *before* 
nuking the segments. </li>
+ * <li> Nuke the batch of unused segments from the metadata store. </li>
+ * <li> Determine the mapping of the set of parents to all their children. 
</li>
+ * <li> Check if unused or parent segments exist. </li>
+ * <li> Find the unreferenced segments. </li>

Review Comment:
   thanks for the javadoc!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to