This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new be3593f0993 Optimize unused segment query for segment allocation
(#16623)
be3593f0993 is described below
commit be3593f09936eb4676912e107c94d60503906648
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Tue Jun 18 20:45:04 2024 +0530
Optimize unused segment query for segment allocation (#16623)
---
.../IndexerSQLMetadataStorageCoordinator.java | 68 +++++++++++++++++-----
.../IndexerSQLMetadataStorageCoordinatorTest.java | 45 ++++++++++++++
2 files changed, 98 insertions(+), 15 deletions(-)
diff --git
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index b9adcd01e14..2b9f328a097 100644
---
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -249,6 +249,44 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
);
}
+ List<String> retrieveUnusedSegmentIdsForExactIntervalAndVersion(
+ String dataSource,
+ Interval interval,
+ String version
+ )
+ {
+ final String sql = "SELECT id FROM %1$s"
+ + " WHERE used = :used"
+ + " AND dataSource = :dataSource"
+ + " AND version = :version"
+ + " AND start = :start AND %2$send%2$s = :end";
+
+ final List<String> matchingSegments = connector.inReadOnlyTransaction(
+ (handle, status) -> {
+ final Query<Map<String, Object>> query = handle
+ .createQuery(StringUtils.format(
+ sql,
+ dbTables.getSegmentsTable(),
+ connector.getQuoteString()
+ ))
+ .setFetchSize(connector.getStreamingFetchSize())
+ .bind("used", false)
+ .bind("dataSource", dataSource)
+ .bind("version", version)
+ .bind("start", interval.getStart().toString())
+ .bind("end", interval.getEnd().toString());
+
+ try (final ResultIterator<String> iterator = query.map((index, r,
ctx) -> r.getString(1)).iterator()) {
+ return ImmutableList.copyOf(iterator);
+ }
+ }
+ );
+
+ log.debug("Found [%,d] unused segments for datasource[%s] for interval[%s]
and version[%s].",
+ matchingSegments.size(), dataSource, interval, version);
+ return matchingSegments;
+ }
+
@Override
public List<DataSegment> retrieveUnusedSegmentsForInterval(
String dataSource,
@@ -1881,7 +1919,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
}
// If yes, try to compute allocated partition num using the max unused
segment shard spec
- SegmentIdWithShardSpec unusedMaxId = getUnusedMaxId(
+ SegmentId unusedMaxId = getUnusedMaxId(
allocatedId.getDataSource(),
allocatedId.getInterval(),
allocatedId.getVersion()
@@ -1893,7 +1931,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
int maxPartitionNum = Math.max(
allocatedId.getShardSpec().getPartitionNum(),
- unusedMaxId.getShardSpec().getPartitionNum() + 1
+ unusedMaxId.getPartitionNum() + 1
);
return new SegmentIdWithShardSpec(
allocatedId.getDataSource(),
@@ -1906,25 +1944,25 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
);
}
- private SegmentIdWithShardSpec getUnusedMaxId(String datasource, Interval
interval, String version)
+ private SegmentId getUnusedMaxId(String datasource, Interval interval,
String version)
{
- List<DataSegment> unusedSegments = retrieveUnusedSegmentsForInterval(
+ List<String> unusedSegmentIds =
retrieveUnusedSegmentIdsForExactIntervalAndVersion(
datasource,
interval,
- ImmutableList.of(version),
- null,
- null
+ version
);
- SegmentIdWithShardSpec unusedMaxId = null;
+ SegmentId unusedMaxId = null;
int maxPartitionNum = -1;
- for (DataSegment unusedSegment : unusedSegments) {
- if (unusedSegment.getInterval().equals(interval)) {
- int partitionNum = unusedSegment.getShardSpec().getPartitionNum();
- if (maxPartitionNum < partitionNum) {
- maxPartitionNum = partitionNum;
- unusedMaxId = SegmentIdWithShardSpec.fromDataSegment(unusedSegment);
- }
+ for (String id : unusedSegmentIds) {
+ final SegmentId segmentId = SegmentId.tryParse(datasource, id);
+ if (segmentId == null) {
+ continue;
+ }
+ int partitionNum = segmentId.getPartitionNum();
+ if (maxPartitionNum < partitionNum) {
+ maxPartitionNum = partitionNum;
+ unusedMaxId = segmentId;
}
}
return unusedMaxId;
diff --git
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index 91f3279eeb6..20a74e6c026 100644
---
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -3275,4 +3275,49 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
);
Assert.assertNull(coordinator.retrieveSegmentForId(theId.asSegmentId().toString(),
true));
}
+
+ @Test
+ public void testRetrieveUnusedSegmentsForExactIntervalAndVersion() throws
Exception
+ {
+ DataSegment unusedForDifferentVersion = createSegment(
+ Intervals.of("2024/2025"),
+ "v0",
+ new NumberedShardSpec(0, 0)
+ );
+ DataSegment unusedSegmentForExactIntervalAndVersion = createSegment(
+ Intervals.of("2024/2025"),
+ "v1",
+ new NumberedShardSpec(0, 0)
+ );
+ DataSegment unusedSegmentForDifferentInterval = createSegment(
+ Intervals.of("2023/2024"),
+ "v1",
+ new NumberedShardSpec(0, 0)
+ );
+ coordinator.commitSegments(
+ ImmutableSet.of(
+ unusedForDifferentVersion,
+ unusedSegmentForDifferentInterval,
+ unusedSegmentForExactIntervalAndVersion
+ ),
+ null
+ );
+ coordinator.markSegmentsAsUnusedWithinInterval(DS.WIKI,
Intervals.ETERNITY);
+
+ DataSegment usedSegmentForExactIntervalAndVersion = createSegment(
+ Intervals.of("2024/2025"),
+ "v1",
+ new NumberedShardSpec(1, 0)
+ );
+
coordinator.commitSegments(ImmutableSet.of(usedSegmentForExactIntervalAndVersion),
null);
+
+
+ List<String> unusedSegmentIdsForIntervalAndVersion =
+
coordinator.retrieveUnusedSegmentIdsForExactIntervalAndVersion(DS.WIKI,
Intervals.of("2024/2025"), "v1");
+ Assert.assertEquals(1, unusedSegmentIdsForIntervalAndVersion.size());
+ Assert.assertEquals(
+ unusedSegmentForExactIntervalAndVersion.getId().toString(),
+ unusedSegmentIdsForIntervalAndVersion.get(0)
+ );
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]