github-code-scanning[bot] commented on code in PR #14407:
URL: https://github.com/apache/druid/pull/14407#discussion_r1301235591


##########
server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java:
##########
@@ -451,10 +468,353 @@
     );
   }
 
+  private Map<String, TaskLockInfo> getAppendedSegmentIds(String datasource, 
Set<TaskLockInfo> replaceLocks)
+  {
+    return derbyConnector.retryWithHandle(
+        handle -> {
+          return coordinator.getAppendedSegmentIds(handle, datasource, 
replaceLocks);
+        }
+    );
+  }
+
+  private Boolean insertIntoSegmentVersionsTable(Map<DataSegment, 
TaskLockInfo> segmentToTaskLockMap)
+  {
+    final String table = 
derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentVersionsTable();
+    return derbyConnector.retryWithHandle(
+        handle -> {
+          PreparedBatch preparedBatch = handle.prepareBatch(
+              StringUtils.format(
+                  StringUtils.format(
+                      "INSERT INTO %1$s (id, dataSource, start, %2$send%2$s, 
segment_id, lock_version) "
+                      + "VALUES (:id, :dataSource, :start, :end, :segment_id, 
:lock_version)",
+                      table,
+                      derbyConnector.getQuoteString()
+                  )
+              )
+          );
+          for (Map.Entry<DataSegment, TaskLockInfo> entry : 
segmentToTaskLockMap.entrySet()) {
+            final DataSegment segment = entry.getKey();
+            final TaskLockInfo lock = entry.getValue();
+            preparedBatch.add()
+                         .bind("id", segment.getId().toString() + ":" + 
lock.hashCode())
+                         .bind("dataSource", segment.getDataSource())
+                         .bind("start", lock.getInterval().getStartMillis())
+                         .bind("end", lock.getInterval().getEndMillis())
+                         .bind("segment_id", segment.getId().toString())
+                         .bind("lock_version", lock.getVersion());
+          }
+
+          final int[] affectedRows = preparedBatch.execute();
+          final boolean succeeded = 
Arrays.stream(affectedRows).allMatch(eachAffectedRows -> eachAffectedRows == 1);
+          if (!succeeded) {
+            throw new ISE("Failed to publish segment to lock metadata mapping 
to DB");
+          }
+          return true;
+        }
+    );
+  }
+
+  @Test
+  public void testAllocateNewSegmentIds()
+  {
+    final String v0 = "1970-01-01";
+    final String v1 = "2023-01-03";
+    final String v2 = "2023-02-01";
+
+    final Set<DataSegment> day1 = new HashSet<>();
+    for (int i = 0; i < 10; i++) {
+      final DataSegment segment = new DataSegment(
+          "foo",
+          Intervals.of("2023-01-01/2023-01-02"),
+          v0,
+          ImmutableMap.of("path", "a-" + i),
+          ImmutableList.of("dim1"),
+          ImmutableList.of("m1"),
+          new LinearShardSpec(i),
+          9,
+          100
+      );
+      day1.add(segment);
+    }
+    final Set<DataSegment> day2 = new HashSet<>();
+    for (int i = 0; i < 10; i++) {
+      final DataSegment segment = new DataSegment(
+          "foo",
+          Intervals.of("2023-01-02/2023-01-03"),
+          v0,
+          ImmutableMap.of("path", "b-" + i),
+          ImmutableList.of("dim1"),
+          ImmutableList.of("m1"),
+          new LinearShardSpec(i),
+          9,
+          100
+      );
+      day2.add(segment);
+    }
+    final Set<DataSegment> day3 = new HashSet<>();
+    for (int i = 0; i < 10; i++) {
+      final DataSegment segment = new DataSegment(
+          "foo",
+          Intervals.of("2023-01-03/2023-01-04"),
+          v0,
+          ImmutableMap.of("path", "c-" + i),
+          ImmutableList.of("dim1"),
+          ImmutableList.of("m1"),
+          new LinearShardSpec(i),
+          9,
+          100
+      );
+      day3.add(segment);
+    }
+    final Set<DataSegment> month2 = new HashSet<>();
+    for (int i = 0; i < 10; i++) {
+      final DataSegment segment = new DataSegment(
+          "foo",
+          Intervals.of("2023-02-01/2023-03-01"),
+          v0,
+          ImmutableMap.of("path", "x-" + i),
+          ImmutableList.of("dim1"),
+          ImmutableList.of("m1"),
+          new LinearShardSpec(i),
+          9,
+          100
+      );
+      month2.add(segment);
+    }
+
+    final Set<DataSegment> higherVersionUsedSegments = new HashSet<>();
+    for (int i = 0; i < 10; i++) {
+      final DataSegment segment = new DataSegment(
+          "foo",
+          Intervals.of("2023-01-01/2023-01-02"),
+          v1,
+          ImmutableMap.of("path", "d-" + i),
+          ImmutableList.of("dim1"),
+          ImmutableList.of("m1"),
+          new NumberedShardSpec(i, 5),
+          9,
+          100
+      );
+      higherVersionUsedSegments.add(segment);
+    }
+    for (int i = 0; i < 10; i++) {
+      final DataSegment segment = new DataSegment(
+          "foo",
+          Intervals.of("2023-01-02/2023-01-03"),
+          v1,
+          ImmutableMap.of("path", "e-" + i),
+          ImmutableList.of("dim1"),
+          ImmutableList.of("m1"),
+          new NumberedShardSpec(i, 0),
+          9,
+          100
+      );
+      higherVersionUsedSegments.add(segment);
+    }
+    for (int i = 0; i < 10; i++) {
+      final DataSegment segment = new DataSegment(
+          "foo",
+          Intervals.of("2023-01-01/2023-02-01"),
+          v2,
+          ImmutableMap.of("path", "f-" + i),
+          ImmutableList.of("dim1"),
+          ImmutableList.of("m1"),
+          new NumberedShardSpec(i, 10),
+          9,
+          100
+      );
+      higherVersionUsedSegments.add(segment);
+    }
+    insertUsedSegments(higherVersionUsedSegments);
+
+    final Set<DataSegment> segmentsToBeProcessed = new HashSet<>();
+    final Set<DataSegment> month1 = new HashSet<>();
+    month1.addAll(day1);
+    month1.addAll(day2);
+    month1.addAll(day3);
+    segmentsToBeProcessed.addAll(month1);
+    segmentsToBeProcessed.addAll(month2);
+    final Map<DataSegment, Set<SegmentIdWithShardSpec>> segmentToNewIds = 
derbyConnector.retryWithHandle(
+        handle -> {
+          return coordinator.allocateNewSegmentIds(handle, "foo", 
segmentsToBeProcessed);
+        }
+    );
+
+    for (DataSegment segment : day1) {
+      final Set<SegmentIdWithShardSpec> newIds = segmentToNewIds.get(segment);
+      Assert.assertEquals(2, newIds.size());
+      Assert.assertEquals(
+          ImmutableSet.of(v1, v2),
+          
newIds.stream().map(SegmentIdWithShardSpec::getVersion).collect(Collectors.toSet())
+      );
+    }
+    for (DataSegment segment : day2) {
+      final Set<SegmentIdWithShardSpec> newIds = segmentToNewIds.get(segment);
+      Assert.assertEquals(2, newIds.size());
+      Assert.assertEquals(
+          ImmutableSet.of(v1, v2),
+          
newIds.stream().map(SegmentIdWithShardSpec::getVersion).collect(Collectors.toSet())
+      );
+    }
+    for (DataSegment segment : day3) {
+      final Set<SegmentIdWithShardSpec> newIds = segmentToNewIds.get(segment);
+      Assert.assertEquals(1, newIds.size());
+      Assert.assertEquals(
+          ImmutableSet.of(v2),
+          
newIds.stream().map(SegmentIdWithShardSpec::getVersion).collect(Collectors.toSet())
+      );
+    }
+    for (DataSegment segment : month2) {
+      
Assert.assertTrue(CollectionUtils.isNullOrEmpty(segmentToNewIds.get(segment)));
+    }
+  }
+
+  @Test
+  public void testCommitAppendSegments()
+  {
+    final Set<DataSegment> allSegments = new HashSet<>();
+    final Set<String> segmentIdsToBeCarriedForward = new HashSet<>();
+    final TaskLockInfo lock = new 
TaskLockInfo(Intervals.of("2023-01-01/2023-01-03"), "2024-01-01");
+    final Map<DataSegment, TaskLockInfo> segmentLockMap = new HashMap<>();
+
+    for (int i = 0; i < 10; i++) {
+      final DataSegment segment = new DataSegment(
+          "foo",
+          Intervals.of("2023-01-01/2023-01-02"),
+          "2023-01-01",
+          ImmutableMap.of("path", "a-" + i),
+          ImmutableList.of("dim1"),
+          ImmutableList.of("m1"),
+          new LinearShardSpec(i),
+          9,
+          100
+      );
+      allSegments.add(segment);
+      segmentIdsToBeCarriedForward.add(segment.getId().toString());
+      segmentLockMap.put(segment, lock);
+    }
+
+    for (int i = 0; i < 10; i++) {
+      final DataSegment segment = new DataSegment(
+          "foo",
+          Intervals.of("2023-01-02/2023-01-03"),
+          "2023-01-02",
+          ImmutableMap.of("path", "b-" + i),
+          ImmutableList.of("dim1"),
+          ImmutableList.of("m1"),
+          new LinearShardSpec(i),
+          9,
+          100
+      );
+      allSegments.add(segment);
+      segmentIdsToBeCarriedForward.add(segment.getId().toString());
+      segmentLockMap.put(segment, lock);
+    }
+
+    for (int i = 0; i < 10; i++) {
+      final DataSegment segment = new DataSegment(
+          "foo",
+          Intervals.of("2023-01-03/2023-01-04"),
+          "2023-01-03",
+          ImmutableMap.of("path", "c-" + i),
+          ImmutableList.of("dim1"),
+          ImmutableList.of("m1"),
+          new LinearShardSpec(i),
+          9,
+          100
+      );
+      allSegments.add(segment);
+    }
+
+    coordinator.commitAppendSegments(allSegments, null, null, segmentLockMap);
+
+    Assert.assertEquals(
+        
allSegments.stream().map(DataSegment::getId).map(SegmentId::toString).collect(Collectors.toSet()),
+        ImmutableSet.copyOf(retrieveUsedSegmentIds())
+    );
+
+    final Set<TaskLockInfo> replaceLocks = Collections.singleton(lock);
+    final Map<String, TaskLockInfo> segmentLockMetadata = 
getAppendedSegmentIds("foo", replaceLocks);
+    Assert.assertEquals(segmentIdsToBeCarriedForward, 
segmentLockMetadata.keySet());
+    Assert.assertEquals(
+        lock,
+        
Iterables.getOnlyElement(ImmutableSet.copyOf(segmentLockMetadata.values()))
+    );
+  }
+
+
+  @Test
+  public void testCommitReplaceSegments()
+  {
+    final TaskLockInfo replaceLock = new 
TaskLockInfo(Intervals.of("2023-01-01/2023-02-01"), "2023-02-01");
+    final Set<DataSegment> segmentsAppendedWithReplaceLock = new HashSet<>();
+    final Map<DataSegment, TaskLockInfo> appendedSegmentToReplaceLockMap = new 
HashMap<>();
+    for (int i = 1; i < 9; i++) {
+      final DataSegment segment = new DataSegment(
+          "foo",
+          Intervals.of("2023-01-0" + i + "/2023-01-0" + (i + 1)),
+          "2023-01-0" + i,
+          ImmutableMap.of("path", "a-" + i),
+          ImmutableList.of("dim1"),
+          ImmutableList.of("m1"),
+          new LinearShardSpec(0),
+          9,
+          100
+      );
+      segmentsAppendedWithReplaceLock.add(segment);
+      appendedSegmentToReplaceLockMap.put(segment, replaceLock);
+    }
+    insertUsedSegments(segmentsAppendedWithReplaceLock);
+    insertIntoSegmentVersionsTable(appendedSegmentToReplaceLockMap);
+
+    final Set<DataSegment> replacingSegments = new HashSet<>();
+    for (int i = 1; i < 9; i++) {
+      final DataSegment segment = new DataSegment(
+          "foo",
+          Intervals.of("2023-01-01/2023-02-01"),
+          "2023-02-01",
+          ImmutableMap.of("path", "b-" + i),
+          ImmutableList.of("dim1"),
+          ImmutableList.of("m1"),
+          new NumberedShardSpec(i, 9),
+          9,
+          100
+      );
+      replacingSegments.add(segment);
+    }
+
+    coordinator.commitReplaceSegments(replacingSegments, 
ImmutableSet.of(replaceLock));
+
+    Assert.assertEquals(
+        2 * segmentsAppendedWithReplaceLock.size() + replacingSegments.size(),

Review Comment:
   ## Result of multiplication cast to wider type
   
   Potential overflow in [int multiplication](1) before it is converted to long 
by use in an invocation context.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/5728)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to