This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch 30.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/30.0.0 by this push:
new 607d4d219b3 [Backport] Do not allocate ids conflicting with existing
segment ids (#16380) (#16383)
607d4d219b3 is described below
commit 607d4d219b34996dc9229c0ccfe2286c4e366393
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Sun May 5 09:01:20 2024 +0530
[Backport] Do not allocate ids conflicting with existing segment ids
(#16380) (#16383)
---
.../common/actions/SegmentAllocateActionTest.java | 52 ++++++++++++
.../IndexerSQLMetadataStorageCoordinator.java | 99 +++++++++++++++-------
.../IndexerSQLMetadataStorageCoordinatorTest.java | 55 ++++++++++++
3 files changed, 174 insertions(+), 32 deletions(-)
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java
index f2da105d269..1857f6d67f7 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java
@@ -29,6 +29,8 @@ import org.apache.druid.indexing.common.SegmentLock;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
+import org.apache.druid.indexing.overlord.TaskLockbox;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
@@ -1062,6 +1064,45 @@ public class SegmentAllocateActionTest
Assert.assertEquals(Duration.ofDays(1).toMillis(),
id2.getInterval().toDurationMillis());
}
+ @Test
+ public void testSegmentIdMustNotBeReused() throws IOException
+ {
+ final IndexerMetadataStorageCoordinator coordinator =
taskActionTestKit.getMetadataStorageCoordinator();
+ final TaskLockbox lockbox = taskActionTestKit.getTaskLockbox();
+ final Task task0 = NoopTask.ofPriority(25);
+ lockbox.add(task0);
+ final NoopTask task1 = NoopTask.ofPriority(50);
+ lockbox.add(task1);
+
+ // Allocate and commit for older task task0
+ final SegmentIdWithShardSpec id0 =
+ allocate(task0, DateTimes.nowUtc(), Granularities.NONE,
Granularities.ALL, "seq", "0");
+ final DataSegment dataSegment0 = getSegmentForIdentifier(id0);
+ coordinator.commitSegments(ImmutableSet.of(dataSegment0), null);
+ lockbox.unlock(task0, Intervals.ETERNITY);
+
+ // Allocate and commit for newer task task1. Pending segments are cleaned
up
+ final SegmentIdWithShardSpec id1 =
+ allocate(task1, DateTimes.nowUtc(), Granularities.NONE,
Granularities.ALL, "seq", "1");
+ final DataSegment dataSegment1 = getSegmentForIdentifier(id1);
+ final SegmentIdWithShardSpec id2 =
+ allocate(task1, DateTimes.nowUtc(), Granularities.NONE,
Granularities.ALL, "seq", "2");
+ final DataSegment dataSegment2 = getSegmentForIdentifier(id2);
+ coordinator.commitSegments(ImmutableSet.of(dataSegment1, dataSegment2),
null);
+ // Clean up pending segments corresponding to the last pending segment
+ coordinator.deletePendingSegmentsForTaskAllocatorId(task1.getDataSource(),
task1.getTaskAllocatorId());
+
+ // Drop all segments
+ coordinator.markSegmentsAsUnusedWithinInterval(task0.getDataSource(),
Intervals.ETERNITY);
+
+ // Allocate another id and ensure that it doesn't exist in the
druid_segments table
+ final SegmentIdWithShardSpec theId =
+ allocate(task1, DateTimes.nowUtc(), Granularities.NONE,
Granularities.ALL, "seq", "3");
+
Assert.assertNull(coordinator.retrieveSegmentForId(theId.asSegmentId().toString(),
true));
+
+ lockbox.unlock(task1, Intervals.ETERNITY);
+ }
+
private SegmentIdWithShardSpec allocate(
final Task task,
final DateTime timestamp,
@@ -1123,4 +1164,15 @@ public class SegmentAllocateActionTest
Assert.assertEquals(expected, actual);
Assert.assertEquals(expected.getShardSpec(), actual.getShardSpec());
}
+
+ private DataSegment getSegmentForIdentifier(SegmentIdWithShardSpec
identifier)
+ {
+ return DataSegment.builder()
+ .dataSource(identifier.getDataSource())
+ .interval(identifier.getInterval())
+ .version(identifier.getVersion())
+ .shardSpec(identifier.getShardSpec())
+ .size(100)
+ .build();
+ }
}
diff --git
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index 618173a5db7..2b02f09926b 100644
---
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -28,7 +28,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
-import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
import com.google.common.io.BaseEncoding;
import com.google.inject.Inject;
@@ -1013,33 +1012,6 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
return allocatedSegmentIds;
}
- @SuppressWarnings("UnstableApiUsage")
- private String getSequenceNameAndPrevIdSha(
- SegmentCreateRequest request,
- SegmentIdWithShardSpec pendingSegmentId,
- boolean skipSegmentLineageCheck
- )
- {
- final Hasher hasher = Hashing.sha1().newHasher()
-
.putBytes(StringUtils.toUtf8(request.getSequenceName()))
- .putByte((byte) 0xff);
-
- if (skipSegmentLineageCheck) {
- final Interval interval = pendingSegmentId.getInterval();
- hasher
- .putLong(interval.getStartMillis())
- .putLong(interval.getEndMillis());
- } else {
- hasher
- .putBytes(StringUtils.toUtf8(request.getPreviousSegmentId()));
- }
-
- hasher.putByte((byte) 0xff);
- hasher.putBytes(StringUtils.toUtf8(pendingSegmentId.getVersion()));
-
- return BaseEncoding.base16().encode(hasher.hash().asBytes());
- }
-
@Nullable
private SegmentIdWithShardSpec allocatePendingSegment(
final Handle handle,
@@ -1727,7 +1699,6 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
// The number of core partitions must always be chosen from the set of
used segments in the SegmentTimeline.
// When the core partitions have been dropped, using pending segments
may lead to an incorrect state
// where the chunk is believed to have core partitions and queries
results are incorrect.
-
SegmentIdWithShardSpec pendingSegmentId = new SegmentIdWithShardSpec(
dataSource,
interval,
@@ -1739,7 +1710,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
)
);
return new PendingSegmentRecord(
- pendingSegmentId,
+ getTrueAllocatedId(pendingSegmentId),
request.getSequenceName(),
request.getPreviousSegmentId(),
request.getUpgradedFromSegmentId(),
@@ -1875,8 +1846,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
// The number of core partitions must always be chosen from the set of
used segments in the SegmentTimeline.
// When the core partitions have been dropped, using pending segments
may lead to an incorrect state
// where the chunk is believed to have core partitions and queries
results are incorrect.
-
- return new SegmentIdWithShardSpec(
+ final SegmentIdWithShardSpec allocatedId = new SegmentIdWithShardSpec(
dataSource,
interval,
Preconditions.checkNotNull(newSegmentVersion, "newSegmentVersion"),
@@ -1886,7 +1856,72 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
committedMaxId == null ? 0 :
committedMaxId.getShardSpec().getNumCorePartitions()
)
);
+ return getTrueAllocatedId(allocatedId);
+ }
+ }
+
+ /**
+ * Verifies that the allocated id doesn't already exist in the druid
segments table.
+ * If yes, try to get the max unallocated id considering the unused segments
for the datasource, version and interval
+ * Otherwise, use the same id.
+ * @param allocatedId The segment allcoted on the basis of used and pending
segments
+ * @return a segment id that isn't already used by other unused segments
+ */
+ private SegmentIdWithShardSpec getTrueAllocatedId(SegmentIdWithShardSpec
allocatedId)
+ {
+ // Check if there is a conflict with an existing entry in the segments
table
+ if (retrieveSegmentForId(allocatedId.asSegmentId().toString(), true) ==
null) {
+ return allocatedId;
+ }
+
+ // If yes, try to compute allocated partition num using the max unused
segment shard spec
+ SegmentIdWithShardSpec unusedMaxId = getUnusedMaxId(
+ allocatedId.getDataSource(),
+ allocatedId.getInterval(),
+ allocatedId.getVersion()
+ );
+ // No unused segment. Just return the allocated id
+ if (unusedMaxId == null) {
+ return allocatedId;
+ }
+
+ int maxPartitionNum = Math.max(
+ allocatedId.getShardSpec().getPartitionNum(),
+ unusedMaxId.getShardSpec().getPartitionNum() + 1
+ );
+ return new SegmentIdWithShardSpec(
+ allocatedId.getDataSource(),
+ allocatedId.getInterval(),
+ allocatedId.getVersion(),
+ new NumberedShardSpec(
+ maxPartitionNum,
+ allocatedId.getShardSpec().getNumCorePartitions()
+ )
+ );
+ }
+
+ private SegmentIdWithShardSpec getUnusedMaxId(String datasource, Interval
interval, String version)
+ {
+ List<DataSegment> unusedSegments = retrieveUnusedSegmentsForInterval(
+ datasource,
+ interval,
+ ImmutableList.of(version),
+ null,
+ null
+ );
+
+ SegmentIdWithShardSpec unusedMaxId = null;
+ int maxPartitionNum = -1;
+ for (DataSegment unusedSegment : unusedSegments) {
+ if (unusedSegment.getInterval().equals(interval)) {
+ int partitionNum = unusedSegment.getShardSpec().getPartitionNum();
+ if (maxPartitionNum < partitionNum) {
+ maxPartitionNum = partitionNum;
+ unusedMaxId = SegmentIdWithShardSpec.fromDataSegment(unusedSegment);
+ }
+ }
}
+ return unusedMaxId;
}
@Override
diff --git
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index 29b7f92e8e5..fc43d7126fe 100644
---
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -3192,4 +3192,59 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
SegmentTimeline segmentTimeline =
SegmentTimeline.forSegments(allUsedSegments);
Assert.assertEquals(0, segmentTimeline.lookup(interval).size());
}
+
+ @Test
+ public void testSegmentIdShouldNotBeReallocated() throws IOException
+ {
+ final SegmentIdWithShardSpec idWithNullTaskAllocator =
coordinator.allocatePendingSegment(
+ DS.WIKI,
+ "seq",
+ "0",
+ Intervals.ETERNITY,
+ NumberedPartialShardSpec.instance(),
+ "version",
+ false,
+ null
+ );
+ final DataSegment dataSegment0 = createSegment(
+ idWithNullTaskAllocator.getInterval(),
+ idWithNullTaskAllocator.getVersion(),
+ idWithNullTaskAllocator.getShardSpec()
+ );
+
+ final SegmentIdWithShardSpec idWithValidTaskAllocator =
coordinator.allocatePendingSegment(
+ DS.WIKI,
+ "seq",
+ "1",
+ Intervals.ETERNITY,
+ NumberedPartialShardSpec.instance(),
+ "version",
+ false,
+ "taskAllocatorId"
+ );
+ final DataSegment dataSegment1 = createSegment(
+ idWithValidTaskAllocator.getInterval(),
+ idWithValidTaskAllocator.getVersion(),
+ idWithValidTaskAllocator.getShardSpec()
+ );
+
+ // Insert pending segments
+ coordinator.commitSegments(ImmutableSet.of(dataSegment0, dataSegment1),
null);
+ // Clean up pending segments corresponding to the valid task allocator id
+ coordinator.deletePendingSegmentsForTaskAllocatorId(DS.WIKI,
"taskAllocatorId");
+ // Mark all segments as unused
+ coordinator.markSegmentsAsUnusedWithinInterval(DS.WIKI,
Intervals.ETERNITY);
+
+ final SegmentIdWithShardSpec theId = coordinator.allocatePendingSegment(
+ DS.WIKI,
+ "seq",
+ "2",
+ Intervals.ETERNITY,
+ NumberedPartialShardSpec.instance(),
+ "version",
+ false,
+ "taskAllocatorId"
+ );
+
Assert.assertNull(coordinator.retrieveSegmentForId(theId.asSegmentId().toString(),
true));
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]