This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new be3593f0993 Optimize unused segment query for segment allocation 
(#16623)
be3593f0993 is described below

commit be3593f09936eb4676912e107c94d60503906648
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Tue Jun 18 20:45:04 2024 +0530

    Optimize unused segment query for segment allocation (#16623)
---
 .../IndexerSQLMetadataStorageCoordinator.java      | 68 +++++++++++++++++-----
 .../IndexerSQLMetadataStorageCoordinatorTest.java  | 45 ++++++++++++++
 2 files changed, 98 insertions(+), 15 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index b9adcd01e14..2b9f328a097 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -249,6 +249,44 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
     );
   }
 
+  List<String> retrieveUnusedSegmentIdsForExactIntervalAndVersion(
+      String dataSource,
+      Interval interval,
+      String version
+  )
+  {
+    final String sql = "SELECT id FROM %1$s"
+                       + " WHERE used = :used"
+                       + " AND dataSource = :dataSource"
+                       + " AND version = :version"
+                       + " AND start = :start AND %2$send%2$s = :end";
+
+    final List<String> matchingSegments = connector.inReadOnlyTransaction(
+        (handle, status) -> {
+          final Query<Map<String, Object>> query = handle
+              .createQuery(StringUtils.format(
+                  sql,
+                  dbTables.getSegmentsTable(),
+                  connector.getQuoteString()
+              ))
+              .setFetchSize(connector.getStreamingFetchSize())
+              .bind("used", false)
+              .bind("dataSource", dataSource)
+              .bind("version", version)
+              .bind("start", interval.getStart().toString())
+              .bind("end", interval.getEnd().toString());
+
+          try (final ResultIterator<String> iterator = query.map((index, r, 
ctx) -> r.getString(1)).iterator()) {
+            return ImmutableList.copyOf(iterator);
+          }
+        }
+    );
+
+    log.debug("Found [%,d] unused segments for datasource[%s] for interval[%s] 
and version[%s].",
+             matchingSegments.size(), dataSource, interval, version);
+    return matchingSegments;
+  }
+
   @Override
   public List<DataSegment> retrieveUnusedSegmentsForInterval(
       String dataSource,
@@ -1881,7 +1919,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
     }
 
     // If yes, try to compute allocated partition num using the max unused 
segment shard spec
-    SegmentIdWithShardSpec unusedMaxId = getUnusedMaxId(
+    SegmentId unusedMaxId = getUnusedMaxId(
         allocatedId.getDataSource(),
         allocatedId.getInterval(),
         allocatedId.getVersion()
@@ -1893,7 +1931,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
 
     int maxPartitionNum = Math.max(
         allocatedId.getShardSpec().getPartitionNum(),
-        unusedMaxId.getShardSpec().getPartitionNum() + 1
+        unusedMaxId.getPartitionNum() + 1
     );
     return new SegmentIdWithShardSpec(
         allocatedId.getDataSource(),
@@ -1906,25 +1944,25 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
     );
   }
 
-  private SegmentIdWithShardSpec getUnusedMaxId(String datasource, Interval 
interval, String version)
+  private SegmentId getUnusedMaxId(String datasource, Interval interval, 
String version)
   {
-    List<DataSegment> unusedSegments = retrieveUnusedSegmentsForInterval(
+    List<String> unusedSegmentIds = 
retrieveUnusedSegmentIdsForExactIntervalAndVersion(
         datasource,
         interval,
-        ImmutableList.of(version),
-        null,
-        null
+        version
     );
 
-    SegmentIdWithShardSpec unusedMaxId = null;
+    SegmentId unusedMaxId = null;
     int maxPartitionNum = -1;
-    for (DataSegment unusedSegment : unusedSegments) {
-      if (unusedSegment.getInterval().equals(interval)) {
-        int partitionNum = unusedSegment.getShardSpec().getPartitionNum();
-        if (maxPartitionNum < partitionNum) {
-          maxPartitionNum = partitionNum;
-          unusedMaxId = SegmentIdWithShardSpec.fromDataSegment(unusedSegment);
-        }
+    for (String id : unusedSegmentIds) {
+      final SegmentId segmentId = SegmentId.tryParse(datasource, id);
+      if (segmentId == null) {
+        continue;
+      }
+      int partitionNum = segmentId.getPartitionNum();
+      if (maxPartitionNum < partitionNum) {
+        maxPartitionNum = partitionNum;
+        unusedMaxId = segmentId;
       }
     }
     return unusedMaxId;
diff --git 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index 91f3279eeb6..20a74e6c026 100644
--- 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -3275,4 +3275,49 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
     );
     
Assert.assertNull(coordinator.retrieveSegmentForId(theId.asSegmentId().toString(),
 true));
   }
+
+  @Test
+  public void testRetrieveUnusedSegmentsForExactIntervalAndVersion() throws 
Exception
+  {
+    DataSegment unusedForDifferentVersion = createSegment(
+        Intervals.of("2024/2025"),
+        "v0",
+        new NumberedShardSpec(0, 0)
+    );
+    DataSegment unusedSegmentForExactIntervalAndVersion = createSegment(
+        Intervals.of("2024/2025"),
+        "v1",
+        new NumberedShardSpec(0, 0)
+    );
+    DataSegment unusedSegmentForDifferentInterval = createSegment(
+        Intervals.of("2023/2024"),
+        "v1",
+        new NumberedShardSpec(0, 0)
+    );
+    coordinator.commitSegments(
+        ImmutableSet.of(
+            unusedForDifferentVersion,
+            unusedSegmentForDifferentInterval,
+            unusedSegmentForExactIntervalAndVersion
+        ),
+        null
+    );
+    coordinator.markSegmentsAsUnusedWithinInterval(DS.WIKI, 
Intervals.ETERNITY);
+
+    DataSegment usedSegmentForExactIntervalAndVersion = createSegment(
+        Intervals.of("2024/2025"),
+        "v1",
+        new NumberedShardSpec(1, 0)
+    );
+    
coordinator.commitSegments(ImmutableSet.of(usedSegmentForExactIntervalAndVersion),
 null);
+
+
+    List<String> unusedSegmentIdsForIntervalAndVersion =
+        
coordinator.retrieveUnusedSegmentIdsForExactIntervalAndVersion(DS.WIKI, 
Intervals.of("2024/2025"), "v1");
+    Assert.assertEquals(1, unusedSegmentIdsForIntervalAndVersion.size());
+    Assert.assertEquals(
+        unusedSegmentForExactIntervalAndVersion.getId().toString(),
+        unusedSegmentIdsForIntervalAndVersion.get(0)
+    );
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to