AmatyaAvadhanula commented on code in PR #16144:
URL: https://github.com/apache/druid/pull/16144#discussion_r1566973978


##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -281,99 +279,74 @@ public int markSegmentsAsUnusedWithinInterval(String 
dataSource, Interval interv
   }
 
   /**
-   * Fetches all the pending segments, whose interval overlaps with the given
-   * search interval and has a sequence_name that begins with one of the 
prefixes in sequenceNamePrefixFilter
-   * from the metadata store. Returns a Map from the pending segment ID to the 
sequence name.
+   * Fetches all the pending segments, whose interval overlaps with the given 
search interval, from the metadata store.
    */
   @VisibleForTesting
-  Map<SegmentIdWithShardSpec, String> getPendingSegmentsForIntervalWithHandle(
+  List<PendingSegmentRecord> getPendingSegmentsForIntervalWithHandle(
       final Handle handle,
       final String dataSource,
-      final Interval interval,
-      final Set<String> sequenceNamePrefixFilter
-  ) throws IOException
+      final Interval interval
+  )
   {
-    if (sequenceNamePrefixFilter.isEmpty()) {
-      return Collections.emptyMap();
-    }
-
-    final List<String> sequenceNamePrefixes = new 
ArrayList<>(sequenceNamePrefixFilter);
-    final List<String> sequenceNamePrefixConditions = new ArrayList<>();
-    for (int i = 0; i < sequenceNamePrefixes.size(); i++) {
-      sequenceNamePrefixConditions.add(StringUtils.format("(sequence_name LIKE 
:prefix%d)", i));
-    }
+    boolean compareIntervalEndpointsAsStrings = 
Intervals.canCompareEndpointsAsStrings(interval);
 
-    String sql = "SELECT sequence_name, payload"
+    String sql = "SELECT payload, sequence_name, sequence_prev_id, 
task_allocator_id, upgraded_from_segment_id"
                  + " FROM " + dbTables.getPendingSegmentsTable()
-                 + " WHERE dataSource = :dataSource"
-                 + " AND start < :end"
-                 + StringUtils.format(" AND %1$send%1$s > :start", 
connector.getQuoteString())
-                 + " AND ( " + String.join(" OR ", 
sequenceNamePrefixConditions) + " )";
+                 + " WHERE dataSource = :dataSource";
+    if (compareIntervalEndpointsAsStrings) {
+      sql = sql
+            + " AND start < :end"
+            + StringUtils.format(" AND %1$send%1$s > :start", 
connector.getQuoteString());
+    }
 
     Query<Map<String, Object>> query = handle.createQuery(sql)
-                                             .bind("dataSource", dataSource)
-                                             .bind("start", 
interval.getStart().toString())
-                                             .bind("end", 
interval.getEnd().toString());
-
-    for (int i = 0; i < sequenceNamePrefixes.size(); i++) {
-      query.bind(StringUtils.format("prefix%d", i), 
sequenceNamePrefixes.get(i) + "%");
+                                             .bind("dataSource", dataSource);
+    if (compareIntervalEndpointsAsStrings) {
+      query = query.bind("start", interval.getStart().toString())
+                   .bind("end", interval.getEnd().toString());
     }
 
-    final ResultIterator<PendingSegmentsRecord> dbSegments =
-        query.map((index, r, ctx) -> PendingSegmentsRecord.fromResultSet(r))
-             .iterator();
 
-    final Map<SegmentIdWithShardSpec, String> pendingSegmentToSequenceName = 
new HashMap<>();
-    while (dbSegments.hasNext()) {
-      PendingSegmentsRecord record = dbSegments.next();
-      final SegmentIdWithShardSpec identifier = 
jsonMapper.readValue(record.payload, SegmentIdWithShardSpec.class);
-
-      if (interval.overlaps(identifier.getInterval())) {
-        pendingSegmentToSequenceName.put(identifier, record.sequenceName);
+    final ResultIterator<PendingSegmentRecord> pendingSegmentIterator =
+        query.map((index, r, ctx) -> PendingSegmentRecord.fromResultSet(r, 
jsonMapper))
+             .iterator();
+    final ImmutableList.Builder<PendingSegmentRecord> pendingSegments = 
ImmutableList.builder();
+    while (pendingSegmentIterator.hasNext()) {
+      final PendingSegmentRecord pendingSegment = 
pendingSegmentIterator.next();
+      if (compareIntervalEndpointsAsStrings || 
pendingSegment.getId().getInterval().overlaps(interval)) {
+        pendingSegments.add(pendingSegment);
       }
     }
-
-    dbSegments.close();
-
-    return pendingSegmentToSequenceName;
+    pendingSegmentIterator.close();
+    return pendingSegments.build();
   }
 
-  private Map<SegmentIdWithShardSpec, String> 
getPendingSegmentsForIntervalWithHandle(
+  List<PendingSegmentRecord> getPendingSegmentsForTaskAllocatorIdWithHandle(

Review Comment:
   It is being used to test pending segment upgrade in 
`IndexerSQLMetadatastorageCoordinatorTest`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to