This is an automated email from the ASF dual-hosted git repository.

abhishekrb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 2136dc3591a Batch segment retrieval from the metadata store (#15305)
2136dc3591a is described below

commit 2136dc3591a74522e61ba04e39a2a4d8a1b27b5d
Author: Abhishek Radhakrishnan <[email protected]>
AuthorDate: Mon Nov 6 11:30:24 2023 -0800

    Batch segment retrieval from the metadata store (#15305)
    
    * Add a unit test that fails when used segments with too many intervals are 
retrieved.
    
    - This is a failing test case that needs to be ignored.
    
    * Batch the intervals (use 100 as it's consistent with batching in other 
places).
    
    * move the filtering inside the batch
    
    * Account for limit cross the batch splits.
    
    * Adjustments
    
    * Fixup and add tests
    
    * small refactor
    
    * add more tests.
    
    * remove wrapper.
    
    * Minor edits
    
    * assert out of range
---
 .../druid/metadata/SqlSegmentsMetadataQuery.java   |  79 +++++--
 .../IndexerSQLMetadataStorageCoordinatorTest.java  | 248 +++++++++++++++++++++
 2 files changed, 309 insertions(+), 18 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java 
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
index 76e4f974576..dd86ad6df37 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
@@ -22,6 +22,8 @@ package org.apache.druid.metadata;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.UnmodifiableIterator;
 import org.apache.druid.java.util.common.CloseableIterators;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.IAE;
@@ -40,6 +42,7 @@ import org.skife.jdbi.v2.ResultIterator;
 
 import javax.annotation.Nullable;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
@@ -56,6 +59,13 @@ public class SqlSegmentsMetadataQuery
 {
   private static final Logger log = new Logger(SqlSegmentsMetadataQuery.class);
 
+  /**
+   * Maximum number of intervals to consider for a batch.
+   * This is similar to {@link 
IndexerSQLMetadataStorageCoordinator#MAX_NUM_SEGMENTS_TO_ANNOUNCE_AT_ONCE}, but 
imposed
+   * on the intervals size.
+   */
+  private static final int MAX_INTERVALS_PER_BATCH = 100;
+
   private final Handle handle;
   private final SQLMetadataConnector connector;
   private final MetadataStorageTablesConfig dbTables;
@@ -344,6 +354,42 @@ public class SqlSegmentsMetadataQuery
       final boolean used,
       @Nullable final Integer limit
   )
+  {
+    if (intervals.isEmpty()) {
+      return CloseableIterators.withEmptyBaggage(
+          retrieveSegmentsInIntervalsBatch(dataSource, intervals, matchMode, 
used, limit)
+      );
+    } else {
+      final List<List<Interval>> intervalsLists = Lists.partition(new 
ArrayList<>(intervals), MAX_INTERVALS_PER_BATCH);
+      final List<Iterator<DataSegment>> resultingIterators = new ArrayList<>();
+      Integer limitPerBatch = limit;
+
+      for (final List<Interval> intervalList : intervalsLists) {
+        final UnmodifiableIterator<DataSegment> iterator = 
retrieveSegmentsInIntervalsBatch(dataSource, intervalList, matchMode, used, 
limitPerBatch);
+        if (limitPerBatch != null) {
+          // If limit is provided, we need to shrink the limit for subsequent 
batches or circuit break if
+          // we have reached what was requested for.
+          final List<DataSegment> dataSegments = 
ImmutableList.copyOf(iterator);
+          resultingIterators.add(dataSegments.iterator());
+          if (dataSegments.size() >= limitPerBatch) {
+            break;
+          }
+          limitPerBatch -= dataSegments.size();
+        } else {
+          resultingIterators.add(iterator);
+        }
+      }
+      return 
CloseableIterators.withEmptyBaggage(Iterators.concat(resultingIterators.iterator()));
+    }
+  }
+
+  private UnmodifiableIterator<DataSegment> retrieveSegmentsInIntervalsBatch(
+      final String dataSource,
+      final Collection<Interval> intervals,
+      final IntervalMode matchMode,
+      final boolean used,
+      @Nullable final Integer limit
+  )
   {
     // Check if the intervals all support comparing as strings. If so, bake 
them into the SQL.
     final boolean compareAsString = 
intervals.stream().allMatch(Intervals::canCompareEndpointsAsStrings);
@@ -372,27 +418,24 @@ public class SqlSegmentsMetadataQuery
         sql.map((index, r, ctx) -> JacksonUtils.readValue(jsonMapper, 
r.getBytes(1), DataSegment.class))
            .iterator();
 
-    return CloseableIterators.wrap(
-        Iterators.filter(
-            resultIterator,
-            dataSegment -> {
-              if (intervals.isEmpty()) {
+    return Iterators.filter(
+        resultIterator,
+        dataSegment -> {
+          if (intervals.isEmpty()) {
+            return true;
+          } else {
+            // Must re-check that the interval matches, even if comparing as 
string, because the *segment interval*
+            // might not be string-comparable. (Consider a query interval like 
"2000-01-01/3000-01-01" and a
+            // segment interval like "20010/20011".)
+            for (Interval interval : intervals) {
+              if (matchMode.apply(interval, dataSegment.getInterval())) {
                 return true;
-              } else {
-                // Must re-check that the interval matches, even if comparing 
as string, because the *segment interval*
-                // might not be string-comparable. (Consider a query interval 
like "2000-01-01/3000-01-01" and a
-                // segment interval like "20010/20011".)
-                for (Interval interval : intervals) {
-                  if (matchMode.apply(interval, dataSegment.getInterval())) {
-                    return true;
-                  }
-                }
-
-                return false;
               }
             }
-        ),
-        resultIterator
+
+            return false;
+          }
+        }
     );
   }
 
diff --git 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index a8fc9e923c5..888c1006717 100644
--- 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -38,6 +38,7 @@ import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.jackson.JacksonUtils;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
 import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
 import org.apache.druid.timeline.DataSegment;
@@ -69,6 +70,7 @@ import org.skife.jdbi.v2.util.StringMapper;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -1062,6 +1064,213 @@ public class IndexerSQLMetadataStorageCoordinatorTest
     ).containsOnlyOnce(defaultSegment3);
   }
 
+  @Test
+  public void testRetrieveUsedSegmentsUsingMultipleIntervals() throws 
IOException
+  {
+    final List<DataSegment> segments = createAndGetUsedYearSegments(1900, 
2133);
+    final List<Interval> intervals = 
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList());
+
+    final Collection<DataSegment> actualUsedSegments = 
coordinator.retrieveUsedSegmentsForIntervals(
+        DS.WIKI,
+        intervals,
+        Segments.ONLY_VISIBLE
+    );
+
+    Assert.assertEquals(segments.size(), actualUsedSegments.size());
+    Assert.assertTrue(actualUsedSegments.containsAll(segments));
+  }
+
+  @Test
+  public void testRetrieveAllUsedSegmentsUsingIntervalsOutOfRange() throws 
IOException
+  {
+    final List<DataSegment> segments = createAndGetUsedYearSegments(1905, 
1910);
+
+    final Interval outOfRangeInterval = Intervals.of("1700/1800");
+    Assert.assertTrue(segments.stream()
+                              .anyMatch(segment -> 
!segment.getInterval().overlaps(outOfRangeInterval)));
+
+    final Collection<DataSegment> actualUsedSegments = 
coordinator.retrieveUsedSegmentsForIntervals(
+        DS.WIKI,
+        ImmutableList.of(outOfRangeInterval),
+        Segments.ONLY_VISIBLE
+    );
+
+    Assert.assertEquals(0, actualUsedSegments.size());
+  }
+
+  @Test
+  public void testRetrieveAllUsedSegmentsUsingNoIntervals() throws IOException
+  {
+    final List<DataSegment> segments = createAndGetUsedYearSegments(1900, 
2133);
+
+    final Collection<DataSegment> actualUsedSegments = 
coordinator.retrieveAllUsedSegments(
+        DS.WIKI,
+        Segments.ONLY_VISIBLE
+    );
+
+    Assert.assertEquals(segments.size(), actualUsedSegments.size());
+    Assert.assertTrue(actualUsedSegments.containsAll(segments));
+  }
+
+  @Test
+  public void testRetrieveUnusedSegmentsUsingSingleIntervalAndNoLimit() throws 
IOException
+  {
+    final List<DataSegment> segments = createAndGetUsedYearSegments(1900, 
2133);
+    markAllSegmentsUnused(new HashSet<>(segments));
+
+    final List<DataSegment> actualUnusedSegments = 
coordinator.retrieveUnusedSegmentsForInterval(
+        DS.WIKI,
+        Intervals.of("1900/3000"),
+        null
+    );
+
+    Assert.assertEquals(segments.size(), actualUnusedSegments.size());
+    Assert.assertTrue(actualUnusedSegments.containsAll(segments));
+  }
+
+  @Test
+  public void testRetrieveUnusedSegmentsUsingSingleIntervalAndLimitAtRange() 
throws IOException
+  {
+    final List<DataSegment> segments = createAndGetUsedYearSegments(1900, 
2133);
+    markAllSegmentsUnused(new HashSet<>(segments));
+
+    final int requestedLimit = segments.size();
+    final List<DataSegment> actualUnusedSegments = 
coordinator.retrieveUnusedSegmentsForInterval(
+        DS.WIKI,
+        Intervals.of("1900/3000"),
+        requestedLimit
+    );
+
+    Assert.assertEquals(requestedLimit, actualUnusedSegments.size());
+    Assert.assertTrue(actualUnusedSegments.containsAll(segments));
+  }
+
+  @Test
+  public void testRetrieveUnusedSegmentsUsingSingleIntervalAndLimitInRange() 
throws IOException
+  {
+    final List<DataSegment> segments = createAndGetUsedYearSegments(1900, 
2133);
+    markAllSegmentsUnused(new HashSet<>(segments));
+
+    final int requestedLimit = segments.size() - 1;
+    final List<DataSegment> actualUnusedSegments = 
coordinator.retrieveUnusedSegmentsForInterval(
+        DS.WIKI,
+        Intervals.of("1900/3000"),
+        requestedLimit
+    );
+
+    Assert.assertEquals(requestedLimit, actualUnusedSegments.size());
+    
Assert.assertTrue(actualUnusedSegments.containsAll(segments.stream().limit(requestedLimit).collect(Collectors.toList())));
+  }
+
+  @Test
+  public void 
testRetrieveUnusedSegmentsUsingSingleIntervalAndLimitOutOfRange() throws 
IOException
+  {
+    final List<DataSegment> segments = createAndGetUsedYearSegments(1900, 
2133);
+    markAllSegmentsUnused(new HashSet<>(segments));
+
+    final int limit = segments.size() + 1;
+    final List<DataSegment> actualUnusedSegments = 
coordinator.retrieveUnusedSegmentsForInterval(
+        DS.WIKI,
+        Intervals.of("1900/3000"),
+        limit
+    );
+    Assert.assertEquals(segments.size(), actualUnusedSegments.size());
+    Assert.assertTrue(actualUnusedSegments.containsAll(segments));
+  }
+
+  @Test
+  public void testRetrieveUnusedSegmentsUsingSingleIntervalOutOfRange() throws 
IOException
+  {
+    final List<DataSegment> segments = createAndGetUsedYearSegments(1905, 
1910);
+    markAllSegmentsUnused(new HashSet<>(segments));
+
+    final Interval outOfRangeInterval = Intervals.of("1700/1800");
+    Assert.assertTrue(segments.stream()
+                              .anyMatch(segment -> 
!segment.getInterval().overlaps(outOfRangeInterval)));
+    final int limit = segments.size() + 1;
+
+    final List<DataSegment> actualUnusedSegments = 
coordinator.retrieveUnusedSegmentsForInterval(
+        DS.WIKI,
+        outOfRangeInterval,
+        limit
+    );
+    Assert.assertEquals(0, actualUnusedSegments.size());
+  }
+
+  @Test
+  public void testRetrieveUnusedSegmentsUsingMultipleIntervalsAndNoLimit() 
throws IOException
+  {
+    final List<DataSegment> segments = createAndGetUsedYearSegments(1900, 
2133);
+    markAllSegmentsUnused(new HashSet<>(segments));
+
+    final ImmutableList<DataSegment> actualUnusedSegments = 
retrieveUnusedSegmentsUsingMultipleIntervalsAndLimit(
+        
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
+        null
+    );
+    Assert.assertEquals(segments.size(), actualUnusedSegments.size());
+    Assert.assertTrue(segments.containsAll(actualUnusedSegments));
+  }
+
+  @Test
+  public void 
testRetrieveUnusedSegmentsUsingMultipleIntervalsAndLimitAtRange() throws 
IOException
+  {
+    final List<DataSegment> segments = createAndGetUsedYearSegments(1900, 
2133);
+    markAllSegmentsUnused(new HashSet<>(segments));
+
+    final ImmutableList<DataSegment> actualUnusedSegments = 
retrieveUnusedSegmentsUsingMultipleIntervalsAndLimit(
+        
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
+        segments.size()
+    );
+    Assert.assertEquals(segments.size(), actualUnusedSegments.size());
+    Assert.assertTrue(segments.containsAll(actualUnusedSegments));
+  }
+
+  @Test
+  public void 
testRetrieveUnusedSegmentsUsingMultipleIntervalsAndLimitInRange() throws 
IOException
+  {
+    final List<DataSegment> segments = createAndGetUsedYearSegments(1900, 
2133);
+    markAllSegmentsUnused(new HashSet<>(segments));
+
+    final int requestedLimit = segments.size() - 1;
+    final ImmutableList<DataSegment> actualUnusedSegments = 
retrieveUnusedSegmentsUsingMultipleIntervalsAndLimit(
+        
segments.stream().limit(requestedLimit).map(DataSegment::getInterval).collect(Collectors.toList()),
+        requestedLimit
+    );
+    Assert.assertEquals(requestedLimit, actualUnusedSegments.size());
+    
Assert.assertTrue(actualUnusedSegments.containsAll(segments.stream().limit(requestedLimit).collect(Collectors.toList())));
+  }
+
+  @Test
+  public void 
testRetrieveUnusedSegmentsUsingMultipleIntervalsAndLimitOutOfRange() throws 
IOException
+  {
+    final List<DataSegment> segments = createAndGetUsedYearSegments(1900, 
2133);
+    markAllSegmentsUnused(new HashSet<>(segments));
+
+    final ImmutableList<DataSegment> actualUnusedSegments = 
retrieveUnusedSegmentsUsingMultipleIntervalsAndLimit(
+        
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
+        segments.size() + 1
+    );
+    Assert.assertEquals(segments.size(), actualUnusedSegments.size());
+    Assert.assertTrue(actualUnusedSegments.containsAll(segments));
+  }
+
+  @Test
+  public void testRetrieveUnusedSegmentsUsingIntervalOutOfRange() throws 
IOException
+  {
+    final List<DataSegment> segments = createAndGetUsedYearSegments(1905, 
1910);
+    markAllSegmentsUnused(new HashSet<>(segments));
+
+    final Interval outOfRangeInterval = Intervals.of("1700/1800");
+    Assert.assertTrue(segments.stream()
+                              .anyMatch(segment -> 
!segment.getInterval().overlaps(outOfRangeInterval)));
+
+    final ImmutableList<DataSegment> actualUnusedSegments = 
retrieveUnusedSegmentsUsingMultipleIntervalsAndLimit(
+        ImmutableList.of(outOfRangeInterval),
+        null
+    );
+    Assert.assertEquals(0, actualUnusedSegments.size());
+  }
+
   @Test
   public void testSimpleUnusedList() throws IOException
   {
@@ -2713,4 +2922,43 @@ public class IndexerSQLMetadataStorageCoordinatorTest
                       .size(100)
                       .build();
   }
+
+  private List<DataSegment> createAndGetUsedYearSegments(final int startYear, 
final int endYear) throws IOException
+  {
+    final List<DataSegment> segments = new ArrayList<>();
+
+    for (int year = startYear; year < endYear; year++) {
+      segments.add(createSegment(
+          Intervals.of("%d/%d", year, year + 1),
+          "version",
+          new LinearShardSpec(0))
+      );
+    }
+    final Set<DataSegment> segmentsSet = new HashSet<>(segments);
+    final Set<DataSegment> committedSegments = 
coordinator.commitSegments(segmentsSet);
+    Assert.assertTrue(committedSegments.containsAll(new HashSet<>(segments)));
+
+    return segments;
+  }
+
+  private ImmutableList<DataSegment> 
retrieveUnusedSegmentsUsingMultipleIntervalsAndLimit(
+      final List<Interval> intervals,
+      final Integer limit
+  )
+  {
+    return derbyConnector.inReadOnlyTransaction(
+        (handle, status) -> {
+          try (final CloseableIterator<DataSegment> iterator =
+                   SqlSegmentsMetadataQuery.forHandle(
+                                               handle,
+                                               derbyConnector,
+                                               
derbyConnectorRule.metadataTablesConfigSupplier().get(),
+                                               mapper
+                                           )
+                                           .retrieveUnusedSegments(DS.WIKI, 
intervals, limit)) {
+            return ImmutableList.copyOf(iterator);
+          }
+        }
+    );
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to