github-advanced-security[bot] commented on code in PR #18996:
URL: https://github.com/apache/druid/pull/18996#discussion_r2780078245
##########
server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java:
##########
@@ -80,43 +80,48 @@
* This method assumes that the given candidate is eligible for compaction
* based on the current compaction config/supervisor of the datasource.
*/
- public CompactionStatus computeCompactionStatus(
- CompactionCandidate candidate,
- CompactionCandidateSearchPolicy searchPolicy
- )
+ public CompactionCandidate.TaskState
computeCompactionTaskState(CompactionCandidate candidate)
{
// Skip intervals that already have a running task
final CompactionTaskStatus lastTaskStatus = getLatestTaskStatus(candidate);
if (lastTaskStatus != null && lastTaskStatus.getState() ==
TaskState.RUNNING) {
- return CompactionStatus.running("Task for interval is already running");
+ return CompactionCandidate.TaskState.TASK_IN_PROGRESS;
}
// Skip intervals that have been recently compacted if segment timeline is
not updated yet
final DateTime snapshotTime = segmentSnapshotTime.get();
if (lastTaskStatus != null
&& lastTaskStatus.getState() == TaskState.SUCCESS
&& snapshotTime != null &&
snapshotTime.isBefore(lastTaskStatus.getUpdatedTime())) {
- return CompactionStatus.skipped(
- "Segment timeline not updated since last compaction task succeeded"
- );
+ return CompactionCandidate.TaskState.RECENTLY_COMPLETED;
}
- // Skip intervals that have been filtered out by the policy
- final CompactionCandidateSearchPolicy.Eligibility eligibility
- = searchPolicy.checkEligibilityForCompaction(candidate,
lastTaskStatus);
- if (eligibility.isEligible()) {
- return CompactionStatus.pending("Not compacted yet");
- } else {
- return CompactionStatus.skipped("Rejected by search policy: %s",
eligibility.getReason());
- }
+ return CompactionCandidate.TaskState.READY;
}
/**
* Tracks the latest compaction status of the given compaction candidates.
* Used only by the {@link CompactionRunSimulator}.
*/
- public void onCompactionStatusComputed(
+ public void onSkippedCandidate(
+ CompactionCandidate candidateSegments,
+ DataSourceCompactionConfig config
Review Comment:
## Useless parameter
The parameter 'config' is never used.
[Show more
details](https://github.com/apache/druid/security/code-scanning/10812)
##########
server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java:
##########
@@ -559,14 +560,149 @@
ImmutableMap.of("path", "b-" + i),
ImmutableList.of("dim1"),
ImmutableList.of("m1"),
- new NumberedShardSpec(i, 9),
+ new NumberedShardSpec(i - 1, 8),
+ 9,
+ 100
+ );
+ replacingSegments.add(segment);
+ }
+
+ Assert.assertTrue(coordinator.commitReplaceSegments(replacingSegments,
Set.of(replaceLock), null).isSuccess());
+
+ Assert.assertEquals(
+ 2L * segmentsAppendedWithReplaceLock.size() + replacingSegments.size(),
+
retrieveUsedSegmentIds(derbyConnectorRule.metadataTablesConfigSupplier().get()).size()
+ );
+
+ final Set<DataSegment> usedSegments
+ = new
HashSet<>(retrieveUsedSegments(derbyConnectorRule.metadataTablesConfigSupplier().get()));
+
+ final Map<String, String> upgradedFromSegmentIdMap =
coordinator.retrieveUpgradedFromSegmentIds(
+ "foo",
+
usedSegments.stream().map(DataSegment::getId).map(SegmentId::toString).collect(Collectors.toSet())
+ );
+
+
Assert.assertTrue(usedSegments.containsAll(segmentsAppendedWithReplaceLock));
+ for (DataSegment appendSegment : segmentsAppendedWithReplaceLock) {
+
Assert.assertNull(upgradedFromSegmentIdMap.get(appendSegment.getId().toString()));
+ }
+ usedSegments.removeAll(segmentsAppendedWithReplaceLock);
+ Assert.assertEquals(usedSegments,
coordinator.retrieveAllUsedSegments("foo", Segments.ONLY_VISIBLE));
+
+ Assert.assertTrue(usedSegments.containsAll(replacingSegments));
+ for (DataSegment replaceSegment : replacingSegments) {
+
Assert.assertNull(upgradedFromSegmentIdMap.get(replaceSegment.getId().toString()));
+ }
+ usedSegments.removeAll(replacingSegments);
+
+ Assert.assertEquals(segmentsAppendedWithReplaceLock.size(),
usedSegments.size());
+ for (DataSegment segmentReplicaWithNewVersion : usedSegments) {
+ boolean hasBeenCarriedForward = false;
+ for (DataSegment appendedSegment : segmentsAppendedWithReplaceLock) {
+ if
(appendedSegment.getLoadSpec().equals(segmentReplicaWithNewVersion.getLoadSpec()))
{
+ Assert.assertEquals(
+ appendedSegment.getId().toString(),
+
upgradedFromSegmentIdMap.get(segmentReplicaWithNewVersion.getId().toString())
+ );
+ hasBeenCarriedForward = true;
+ break;
+ }
+ }
+ Assert.assertTrue(hasBeenCarriedForward);
+ }
+
+ List<PendingSegmentRecord> pendingSegmentsInInterval =
+ coordinator.getPendingSegments("foo",
Intervals.of("2023-01-01/2023-02-01"));
+ Assert.assertEquals(2, pendingSegmentsInInterval.size());
+ final SegmentId rootPendingSegmentId =
pendingSegmentInInterval.getId().asSegmentId();
+ if (pendingSegmentsInInterval.get(0).getUpgradedFromSegmentId() == null) {
+ Assert.assertEquals(rootPendingSegmentId,
pendingSegmentsInInterval.get(0).getId().asSegmentId());
+ Assert.assertEquals(rootPendingSegmentId.toString(),
pendingSegmentsInInterval.get(1).getUpgradedFromSegmentId());
+ } else {
+ Assert.assertEquals(rootPendingSegmentId,
pendingSegmentsInInterval.get(1).getId().asSegmentId());
+ Assert.assertEquals(rootPendingSegmentId.toString(),
pendingSegmentsInInterval.get(0).getUpgradedFromSegmentId());
+ }
+
+ List<PendingSegmentRecord> pendingSegmentsOutsideInterval =
+ coordinator.getPendingSegments("foo",
Intervals.of("2023-04-01/2023-05-01"));
+ Assert.assertEquals(1, pendingSegmentsOutsideInterval.size());
+ Assert.assertEquals(
+ pendingSegmentOutsideInterval.getId().asSegmentId(),
pendingSegmentsOutsideInterval.get(0).getId().asSegmentId()
+ );
+ }
+
+ @Test
+ public void testCommitReplaceSegmentsWithUpdatedCorePartitions()
+ {
+ // this test is very similar to testCommitReplaceSegments, except both
append/replace segments use DimensionRangeShardSpec
+ final ReplaceTaskLock replaceLock = new ReplaceTaskLock("g1",
Intervals.of("2023-01-01/2023-02-01"), "2023-02-01");
+ final Set<DataSegment> segmentsAppendedWithReplaceLock = new HashSet<>();
+ final Map<DataSegment, ReplaceTaskLock> appendedSegmentToReplaceLockMap =
new HashMap<>();
+ final PendingSegmentRecord pendingSegmentInInterval =
PendingSegmentRecord.create(
+ new SegmentIdWithShardSpec(
+ "foo",
+ Intervals.of("2023-01-01/2023-01-02"),
+ "2023-01-02",
+ new NumberedShardSpec(100, 0)
+ ),
+ "",
+ "",
+ null,
+ "append"
+ );
+ final PendingSegmentRecord pendingSegmentOutsideInterval =
PendingSegmentRecord.create(
+ new SegmentIdWithShardSpec(
+ "foo",
+ Intervals.of("2023-04-01/2023-04-02"),
+ "2023-01-02",
+ new NumberedShardSpec(100, 0)
+ ),
+ "",
+ "",
+ null,
+ "append"
+ );
+ for (int i = 1; i < 9; i++) {
+ final DataSegment segment = new DataSegment(
+ "foo",
+ Intervals.of("2023-01-0" + i + "/2023-01-0" + (i + 1)),
+ "2023-01-0" + i,
+ ImmutableMap.of("path", "a-" + i),
+ ImmutableList.of("dim1"),
+ ImmutableList.of("m1"),
+ new DimensionRangeShardSpec(List.of("dim1"), null, null, i - 1, 8),
+ 9,
+ 100
+ );
Review Comment:
## Deprecated method or constructor invocation
Invoking [DataSegment.DataSegment](1) should be avoided because it has been
deprecated.
[Show more
details](https://github.com/apache/druid/security/code-scanning/10810)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]