kfaraz commented on code in PR #17420:
URL: https://github.com/apache/druid/pull/17420#discussion_r1820125029
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -702,12 +711,12 @@ private TaskLockPosse createNewTaskLockPosse(LockRequest
request)
* for the given requests. Updates the holder with the allocated segment if
* the allocation succeeds, otherwise marks it as failed.
*/
- @VisibleForTesting
- void allocateSegmentIds(
+ private void allocateSegmentIds(
String dataSource,
Interval interval,
boolean skipSegmentLineageCheck,
- Collection<SegmentAllocationHolder> holders
+ Collection<SegmentAllocationHolder> holders,
+ boolean skipSegmentPayloadFetchForAllocation
Review Comment:
Please rename this argument everywhere.
You can either call it `reduceMetadataIO` same as the config or
`fetchRequiredSegmentsOnly` or something.
```suggestion
boolean reduceMetadataIO
```
##########
indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java:
##########
@@ -73,6 +89,12 @@ public long getBatchAllocationWaitTime()
{
return 0;
}
+
+ @Override
+ public boolean isSegmentAllocationReduceMetadataIO()
+ {
+ return true;
Review Comment:
Would be nice to run the tests in this class for both true and false and not
just true.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskLockConfig.java:
##########
@@ -36,6 +36,9 @@ public class TaskLockConfig
@JsonProperty
private long batchAllocationWaitTime = 0L;
+ @JsonProperty
+ private boolean segmentAllocationReduceMetadataIO = false;
Review Comment:
This is used only for batch segment allocation, IIUC. Let's rename it to
`batchAllocationReduceMetadataIO`.
Once this has been tested thoroughly, we can remove the flag altogether and
use the new approach for both regular and batch allocation.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java:
##########
@@ -87,6 +87,8 @@ public class SegmentAllocationQueue
private final ConcurrentHashMap<AllocateRequestKey, AllocateRequestBatch>
keyToBatch = new ConcurrentHashMap<>();
private final BlockingDeque<AllocateRequestBatch> processingQueue = new
LinkedBlockingDeque<>(MAX_QUEUE_SIZE);
+ private final boolean skipSegmentPayloadFetchForAllocation;
Review Comment:
Please rename as suggested.
##########
indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java:
##########
@@ -99,6 +121,133 @@ public void tearDown()
emitter.flush();
}
+ @Test
+ @Ignore
Review Comment:
Let's remove this test if we are not running it as a UT. If it is meant to
be a benchmark, please put it in a `Benchmark` class and share the details of a
some sample run in the PR description.
##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -2900,6 +2929,70 @@ public int removeDataSourceMetadataOlderThan(long
timestamp, @NotNull Set<String
);
}
+ @VisibleForTesting
+ Set<DataSegment> retrieveUsedSegmentsForAllocation(
+ final Handle handle,
+ final String dataSource,
+ final Interval interval
+ )
+ {
+ final Set<SegmentId> overlappingSegmentIds =
SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper)
+
.retrieveUsedSegmentIds(
+
dataSource,
+
interval
+ );
+ // Map from version -> interval -> segmentId with the smallest partitionNum
+ Map<String, Map<Interval, SegmentId>> versionIntervalToSmallestSegmentId =
new HashMap<>();
+ for (SegmentId segmentId : overlappingSegmentIds) {
+ final Map<Interval, SegmentId> map
+ =
versionIntervalToSmallestSegmentId.computeIfAbsent(segmentId.getVersion(), v ->
new HashMap<>());
+ final SegmentId value = map.get(segmentId.getInterval());
+ if (value == null || value.getPartitionNum() >
segmentId.getPartitionNum()) {
+ map.put(interval, segmentId);
+ }
+ }
+
+ // Retrieve the segments for the ids stored in the map to get the
numCorePartitions
+ final Set<String> segmentIdsToRetrieve = new HashSet<>();
+ for (Map<Interval, SegmentId> itvlMap :
versionIntervalToSmallestSegmentId.values()) {
+
segmentIdsToRetrieve.addAll(itvlMap.values().stream().map(SegmentId::toString).collect(Collectors.toList()));
+ }
+ final Set<DataSegment> dataSegments = retrieveSegmentsById(dataSource,
segmentIdsToRetrieve);
+ final Set<String> retrievedIds = new HashSet<>();
+ final Map<String, Map<Interval, Integer>>
versionIntervalToNumCorePartitions = new HashMap<>();
+ for (DataSegment segment : dataSegments) {
+ versionIntervalToNumCorePartitions.computeIfAbsent(segment.getVersion(),
v -> new HashMap<>())
+ .put(segment.getInterval(),
segment.getShardSpec().getNumCorePartitions());
+ retrievedIds.add(segment.getId().toString());
+ }
+ if (!retrievedIds.equals(segmentIdsToRetrieve)) {
+ throw DruidException.defensive(
+ "Cannot create DataSegments for segment allocations."
+ + "The used segments may have changed for dataSource[%s] and
interval[%s].",
+ dataSource, interval
+ );
+ }
+
+ // Populate the required segment info
Review Comment:
```suggestion
// Create dummy segments for each segmentId with only the shard spec
populated
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -702,12 +711,12 @@ private TaskLockPosse createNewTaskLockPosse(LockRequest
request)
* for the given requests. Updates the holder with the allocated segment if
* the allocation succeeds, otherwise marks it as failed.
*/
- @VisibleForTesting
- void allocateSegmentIds(
Review Comment:
thanks!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]