github-advanced-security[bot] commented on code in PR #17420:
URL: https://github.com/apache/druid/pull/17420#discussion_r1817695838


##########
indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java:
##########
@@ -99,6 +121,133 @@
     emitter.flush();
   }
 
+  @Test
+  @Ignore
+  public void testLongQueue() throws Exception
+  {
+    allocationQueue.start();
+    allocationQueue.becomeLeader();
+    final Task task = NoopTask.create();
+    taskActionTestKit.getTaskLockbox().add(task);
+
+    final Interval startInterval = Intervals.of("2024-01-01/PT1H");
+    final long hourToMillis = 1000L * 60L * 60L;
+    final long numHours = 48;
+    List<Interval> intervals = new ArrayList<>();
+    for (long hour = 0; hour < numHours; hour++) {
+      intervals.add(
+          new Interval(
+              startInterval.getStartMillis() + hourToMillis * hour,
+              startInterval.getStartMillis() + hourToMillis * hour + 
hourToMillis,
+              DateTimeZone.UTC
+          )
+      );
+    }
+
+    final IndexerSQLMetadataStorageCoordinator coordinator =
+        (IndexerSQLMetadataStorageCoordinator) 
taskActionTestKit.getMetadataStorageCoordinator();
+
+    final List<String> dimensions = new ArrayList<>();
+    for (int i = 0; i < 1000; i++) {
+      dimensions.add("dimension_" + i);
+    }
+    final Set<DataSegment> segments = new HashSet<>();
+    final int numUsedSegmentsPerInterval = 2000;
+    int version = 0;
+    for (Interval interval : intervals) {
+      for (int i = 0; i < numUsedSegmentsPerInterval; i++) {
+        segments.add(
+            DataSegment.builder()
+                       .dataSource(task.getDataSource())
+                       .interval(interval)
+                       .version("version" + version)
+                       .shardSpec(new NumberedShardSpec(i, 
numUsedSegmentsPerInterval))
+                       .dimensions(dimensions)
+                       .size(100)
+                       .build()
+        );
+      }
+      coordinator.commitSegments(segments, null);
+      segments.clear();
+      version = (version + 1) % 10;
+    }
+
+
+    final int numAllocations = 10;
+    final int replicas = 2;
+    Map<String, List<Future<SegmentIdWithShardSpec>>> 
sequenceNameAndPrevIdToFutures = new HashMap<>();
+    for (int i = 0; i < numAllocations; i++) {
+      for (int j = 0; j < numHours; j++) {
+        final int id = numUsedSegmentsPerInterval + i / replicas;
+        final String sequenceId = j + "-sequence" + id;
+        final String prevSequenceId = j + "-sequence" + (id - 1);
+        sequenceNameAndPrevIdToFutures.computeIfAbsent(
+            sequenceId + "|" + prevSequenceId,
+            k -> new ArrayList<>()
+        ).add(
+            allocationQueue.add(
+                new SegmentAllocateRequest(
+                    task,
+                    new SegmentAllocateAction(
+                        task.getDataSource(),
+                        intervals.get(j).getStart(),
+                        Granularities.NONE,
+                        Granularities.HOUR,
+                        sequenceId,
+                        prevSequenceId,
+                        false,
+                        NumberedPartialShardSpec.instance(),
+                        LockGranularity.TIME_CHUNK,
+                        TaskLockType.APPEND
+                    ),
+                    10
+                )
+            )
+        );
+      }
+    }
+    executor.finishAllPendingTasks();
+
+    final Set<SegmentIdWithShardSpec> allocatedIds = new HashSet<>();
+    for (List<Future<SegmentIdWithShardSpec>> sameIds : 
sequenceNameAndPrevIdToFutures.values()) {
+      if (sameIds.isEmpty()) {
+        return;
+      }
+      final SegmentIdWithShardSpec id = sameIds.get(0).get();
+      Assert.assertNotNull(id);
+      for (int i = 1; i < sameIds.size(); i++) {
+        Assert.assertEquals(id, sameIds.get(i).get());
+      }
+      Assert.assertFalse(allocatedIds.contains(id));
+      allocatedIds.add(id);
+    }
+    Assert.assertEquals(numHours * numAllocations, allocatedIds.size() * 
replicas);

Review Comment:
   ## Result of multiplication cast to wider type
   
   Potential overflow in [int multiplication](1) before it is converted to long 
by use in an invocation context.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/8435)



##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -2900,6 +2929,70 @@
     );
   }
 
+  @VisibleForTesting
+  Set<DataSegment> retrieveUsedSegmentsForAllocation(
+      final Handle handle,
+      final String dataSource,
+      final Interval interval
+  )
+  {
+    final Set<SegmentId> overlappingSegmentIds = 
SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper)
+                                                                         
.retrieveUsedSegmentIds(
+                                                                             
dataSource,
+                                                                             
interval
+                                                                         );
+    // Map from version -> interval -> segmentId with the smallest partitionNum
+    Map<String, Map<Interval, SegmentId>> versionIntervalToSmallestSegmentId = 
new HashMap<>();
+    for (SegmentId segmentId : overlappingSegmentIds) {
+      final Map<Interval, SegmentId> map
+          = 
versionIntervalToSmallestSegmentId.computeIfAbsent(segmentId.getVersion(), v -> 
new HashMap<>());
+      final SegmentId value = map.get(segmentId.getInterval());
+      if (value == null || value.getPartitionNum() > 
segmentId.getPartitionNum()) {
+        map.put(interval, segmentId);
+      }
+    }
+
+    // Retrieve the segments for the ids stored in the map to get the 
numCorePartitions
+    final Set<String> segmentIdsToRetrieve = new HashSet<>();
+    for (Map<Interval, SegmentId> itvlMap : 
versionIntervalToSmallestSegmentId.values()) {
+      
segmentIdsToRetrieve.addAll(itvlMap.values().stream().map(SegmentId::toString).collect(Collectors.toList()));
+    }
+    final Set<DataSegment> dataSegments = retrieveSegmentsById(dataSource, 
segmentIdsToRetrieve);
+    final Set<String> retrievedIds = new HashSet<>();
+    final Map<String, Map<Interval, Integer>> 
versionIntervalToNumCorePartitions = new HashMap<>();
+    for (DataSegment segment : dataSegments) {
+      versionIntervalToNumCorePartitions.computeIfAbsent(segment.getVersion(), 
v -> new HashMap<>())
+                                        .put(segment.getInterval(), 
segment.getShardSpec().getNumCorePartitions());
+      retrievedIds.add(segment.getId().toString());
+    }
+    if (!retrievedIds.equals(segmentIdsToRetrieve)) {
+      throw DruidException.defensive(
+          "Cannot create DataSegments for segment allocations."
+          + "The used segments may have changed for dataSource[%s] and 
interval[%s].",

Review Comment:
   ## Missing space in string literal
   
   This string appears to be missing a space after 'allocations.'.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/8436)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to