github-advanced-security[bot] commented on code in PR #17420:
URL: https://github.com/apache/druid/pull/17420#discussion_r1817695838
##########
indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java:
##########
@@ -99,6 +121,133 @@
emitter.flush();
}
+ @Test
+ @Ignore
+ public void testLongQueue() throws Exception
+ {
+ allocationQueue.start();
+ allocationQueue.becomeLeader();
+ final Task task = NoopTask.create();
+ taskActionTestKit.getTaskLockbox().add(task);
+
+ final Interval startInterval = Intervals.of("2024-01-01/PT1H");
+ final long hourToMillis = 1000L * 60L * 60L;
+ final long numHours = 48;
+ List<Interval> intervals = new ArrayList<>();
+ for (long hour = 0; hour < numHours; hour++) {
+ intervals.add(
+ new Interval(
+ startInterval.getStartMillis() + hourToMillis * hour,
+ startInterval.getStartMillis() + hourToMillis * hour +
hourToMillis,
+ DateTimeZone.UTC
+ )
+ );
+ }
+
+ final IndexerSQLMetadataStorageCoordinator coordinator =
+ (IndexerSQLMetadataStorageCoordinator)
taskActionTestKit.getMetadataStorageCoordinator();
+
+ final List<String> dimensions = new ArrayList<>();
+ for (int i = 0; i < 1000; i++) {
+ dimensions.add("dimension_" + i);
+ }
+ final Set<DataSegment> segments = new HashSet<>();
+ final int numUsedSegmentsPerInterval = 2000;
+ int version = 0;
+ for (Interval interval : intervals) {
+ for (int i = 0; i < numUsedSegmentsPerInterval; i++) {
+ segments.add(
+ DataSegment.builder()
+ .dataSource(task.getDataSource())
+ .interval(interval)
+ .version("version" + version)
+ .shardSpec(new NumberedShardSpec(i,
numUsedSegmentsPerInterval))
+ .dimensions(dimensions)
+ .size(100)
+ .build()
+ );
+ }
+ coordinator.commitSegments(segments, null);
+ segments.clear();
+ version = (version + 1) % 10;
+ }
+
+
+ final int numAllocations = 10;
+ final int replicas = 2;
+ Map<String, List<Future<SegmentIdWithShardSpec>>>
sequenceNameAndPrevIdToFutures = new HashMap<>();
+ for (int i = 0; i < numAllocations; i++) {
+ for (int j = 0; j < numHours; j++) {
+ final int id = numUsedSegmentsPerInterval + i / replicas;
+ final String sequenceId = j + "-sequence" + id;
+ final String prevSequenceId = j + "-sequence" + (id - 1);
+ sequenceNameAndPrevIdToFutures.computeIfAbsent(
+ sequenceId + "|" + prevSequenceId,
+ k -> new ArrayList<>()
+ ).add(
+ allocationQueue.add(
+ new SegmentAllocateRequest(
+ task,
+ new SegmentAllocateAction(
+ task.getDataSource(),
+ intervals.get(j).getStart(),
+ Granularities.NONE,
+ Granularities.HOUR,
+ sequenceId,
+ prevSequenceId,
+ false,
+ NumberedPartialShardSpec.instance(),
+ LockGranularity.TIME_CHUNK,
+ TaskLockType.APPEND
+ ),
+ 10
+ )
+ )
+ );
+ }
+ }
+ executor.finishAllPendingTasks();
+
+ final Set<SegmentIdWithShardSpec> allocatedIds = new HashSet<>();
+ for (List<Future<SegmentIdWithShardSpec>> sameIds :
sequenceNameAndPrevIdToFutures.values()) {
+ if (sameIds.isEmpty()) {
+ return;
+ }
+ final SegmentIdWithShardSpec id = sameIds.get(0).get();
+ Assert.assertNotNull(id);
+ for (int i = 1; i < sameIds.size(); i++) {
+ Assert.assertEquals(id, sameIds.get(i).get());
+ }
+ Assert.assertFalse(allocatedIds.contains(id));
+ allocatedIds.add(id);
+ }
+ Assert.assertEquals(numHours * numAllocations, allocatedIds.size() *
replicas);
Review Comment:
## Result of multiplication cast to wider type
Potential overflow in [int multiplication](1) before it is converted to long
by use in an invocation context.
[Show more
details](https://github.com/apache/druid/security/code-scanning/8435)
##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -2900,6 +2929,70 @@
);
}
+ @VisibleForTesting
+ Set<DataSegment> retrieveUsedSegmentsForAllocation(
+ final Handle handle,
+ final String dataSource,
+ final Interval interval
+ )
+ {
+ final Set<SegmentId> overlappingSegmentIds =
SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper)
+
.retrieveUsedSegmentIds(
+
dataSource,
+
interval
+ );
+ // Map from version -> interval -> segmentId with the smallest partitionNum
+ Map<String, Map<Interval, SegmentId>> versionIntervalToSmallestSegmentId =
new HashMap<>();
+ for (SegmentId segmentId : overlappingSegmentIds) {
+ final Map<Interval, SegmentId> map
+ =
versionIntervalToSmallestSegmentId.computeIfAbsent(segmentId.getVersion(), v ->
new HashMap<>());
+ final SegmentId value = map.get(segmentId.getInterval());
+ if (value == null || value.getPartitionNum() >
segmentId.getPartitionNum()) {
+ map.put(interval, segmentId);
+ }
+ }
+
+ // Retrieve the segments for the ids stored in the map to get the
numCorePartitions
+ final Set<String> segmentIdsToRetrieve = new HashSet<>();
+ for (Map<Interval, SegmentId> itvlMap :
versionIntervalToSmallestSegmentId.values()) {
+
segmentIdsToRetrieve.addAll(itvlMap.values().stream().map(SegmentId::toString).collect(Collectors.toList()));
+ }
+ final Set<DataSegment> dataSegments = retrieveSegmentsById(dataSource,
segmentIdsToRetrieve);
+ final Set<String> retrievedIds = new HashSet<>();
+ final Map<String, Map<Interval, Integer>>
versionIntervalToNumCorePartitions = new HashMap<>();
+ for (DataSegment segment : dataSegments) {
+ versionIntervalToNumCorePartitions.computeIfAbsent(segment.getVersion(),
v -> new HashMap<>())
+ .put(segment.getInterval(),
segment.getShardSpec().getNumCorePartitions());
+ retrievedIds.add(segment.getId().toString());
+ }
+ if (!retrievedIds.equals(segmentIdsToRetrieve)) {
+ throw DruidException.defensive(
+ "Cannot create DataSegments for segment allocations."
+ + "The used segments may have changed for dataSource[%s] and
interval[%s].",
Review Comment:
## Missing space in string literal
This string appears to be missing a space after 'allocations.'.
[Show more
details](https://github.com/apache/druid/security/code-scanning/8436)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]