kfaraz commented on code in PR #17420:
URL: https://github.com/apache/druid/pull/17420#discussion_r1820125029


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -702,12 +711,12 @@ private TaskLockPosse createNewTaskLockPosse(LockRequest 
request)
    * for the given requests. Updates the holder with the allocated segment if
    * the allocation succeeds, otherwise marks it as failed.
    */
-  @VisibleForTesting
-  void allocateSegmentIds(
+  private void allocateSegmentIds(
       String dataSource,
       Interval interval,
       boolean skipSegmentLineageCheck,
-      Collection<SegmentAllocationHolder> holders
+      Collection<SegmentAllocationHolder> holders,
+      boolean skipSegmentPayloadFetchForAllocation

Review Comment:
   Please rename this argument everywhere.
   
   You can either call it `reduceMetadataIO` same as the config or 
`fetchRequiredSegmentsOnly` or something.
   
   ```suggestion
         boolean reduceMetadataIO
   ```



##########
indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java:
##########
@@ -73,6 +89,12 @@ public long getBatchAllocationWaitTime()
       {
         return 0;
       }
+
+      @Override
+      public boolean isSegmentAllocationReduceMetadataIO()
+      {
+        return true;

Review Comment:
   Would be nice to run the tests in this class for both true and false and not 
just true.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskLockConfig.java:
##########
@@ -36,6 +36,9 @@ public class TaskLockConfig
   @JsonProperty
   private long batchAllocationWaitTime = 0L;
 
+  @JsonProperty
+  private boolean segmentAllocationReduceMetadataIO = false;

Review Comment:
   This is used only for batch segment allocation, IIUC. Let's rename it to 
`batchAllocationReduceMetadataIO`.
   Once this has been tested thoroughly, we can remove the flag altogether and 
use the new approach for both regular and batch allocation.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java:
##########
@@ -87,6 +87,8 @@ public class SegmentAllocationQueue
   private final ConcurrentHashMap<AllocateRequestKey, AllocateRequestBatch> 
keyToBatch = new ConcurrentHashMap<>();
   private final BlockingDeque<AllocateRequestBatch> processingQueue = new 
LinkedBlockingDeque<>(MAX_QUEUE_SIZE);
 
+  private final boolean skipSegmentPayloadFetchForAllocation;

Review Comment:
   Please rename as suggested.



##########
indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java:
##########
@@ -99,6 +121,133 @@ public void tearDown()
     emitter.flush();
   }
 
+  @Test
+  @Ignore

Review Comment:
   Let's remove this test if we are not running it as a UT. If it is meant to 
be a benchmark, please put it in a `Benchmark` class and share the details of a 
some sample run in the PR description.



##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -2900,6 +2929,70 @@ public int removeDataSourceMetadataOlderThan(long 
timestamp, @NotNull Set<String
     );
   }
 
+  @VisibleForTesting
+  Set<DataSegment> retrieveUsedSegmentsForAllocation(
+      final Handle handle,
+      final String dataSource,
+      final Interval interval
+  )
+  {
+    final Set<SegmentId> overlappingSegmentIds = 
SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper)
+                                                                         
.retrieveUsedSegmentIds(
+                                                                             
dataSource,
+                                                                             
interval
+                                                                         );
+    // Map from version -> interval -> segmentId with the smallest partitionNum
+    Map<String, Map<Interval, SegmentId>> versionIntervalToSmallestSegmentId = 
new HashMap<>();
+    for (SegmentId segmentId : overlappingSegmentIds) {
+      final Map<Interval, SegmentId> map
+          = 
versionIntervalToSmallestSegmentId.computeIfAbsent(segmentId.getVersion(), v -> 
new HashMap<>());
+      final SegmentId value = map.get(segmentId.getInterval());
+      if (value == null || value.getPartitionNum() > 
segmentId.getPartitionNum()) {
+        map.put(interval, segmentId);
+      }
+    }
+
+    // Retrieve the segments for the ids stored in the map to get the 
numCorePartitions
+    final Set<String> segmentIdsToRetrieve = new HashSet<>();
+    for (Map<Interval, SegmentId> itvlMap : 
versionIntervalToSmallestSegmentId.values()) {
+      
segmentIdsToRetrieve.addAll(itvlMap.values().stream().map(SegmentId::toString).collect(Collectors.toList()));
+    }
+    final Set<DataSegment> dataSegments = retrieveSegmentsById(dataSource, 
segmentIdsToRetrieve);
+    final Set<String> retrievedIds = new HashSet<>();
+    final Map<String, Map<Interval, Integer>> 
versionIntervalToNumCorePartitions = new HashMap<>();
+    for (DataSegment segment : dataSegments) {
+      versionIntervalToNumCorePartitions.computeIfAbsent(segment.getVersion(), 
v -> new HashMap<>())
+                                        .put(segment.getInterval(), 
segment.getShardSpec().getNumCorePartitions());
+      retrievedIds.add(segment.getId().toString());
+    }
+    if (!retrievedIds.equals(segmentIdsToRetrieve)) {
+      throw DruidException.defensive(
+          "Cannot create DataSegments for segment allocations."
+          + "The used segments may have changed for dataSource[%s] and 
interval[%s].",
+          dataSource, interval
+      );
+    }
+
+    // Populate the required segment info

Review Comment:
   ```suggestion
       // Create dummy segments for each segmentId with only the shard spec 
populated
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -702,12 +711,12 @@ private TaskLockPosse createNewTaskLockPosse(LockRequest 
request)
    * for the given requests. Updates the holder with the allocated segment if
    * the allocation succeeds, otherwise marks it as failed.
    */
-  @VisibleForTesting
-  void allocateSegmentIds(

Review Comment:
   thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to