This is an automated email from the ASF dual-hosted git repository.
abhishekrb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 2136dc3591a Batch segment retrieval from the metadata store (#15305)
2136dc3591a is described below
commit 2136dc3591a74522e61ba04e39a2a4d8a1b27b5d
Author: Abhishek Radhakrishnan <[email protected]>
AuthorDate: Mon Nov 6 11:30:24 2023 -0800
Batch segment retrieval from the metadata store (#15305)
* Add a unit test that fails when used segments with too many intervals are
retrieved.
- This is a failing test case that needs to be ignored.
* Batch the intervals (use 100 as it's consistent with batching in other
places).
* move the filtering inside the batch
* Account for limit cross the batch splits.
* Adjustments
* Fixup and add tests
* small refactor
* add more tests.
* remove wrapper.
* Minor edits
* assert out of range
---
.../druid/metadata/SqlSegmentsMetadataQuery.java | 79 +++++--
.../IndexerSQLMetadataStorageCoordinatorTest.java | 248 +++++++++++++++++++++
2 files changed, 309 insertions(+), 18 deletions(-)
diff --git
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
index 76e4f974576..dd86ad6df37 100644
---
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
+++
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
@@ -22,6 +22,8 @@ package org.apache.druid.metadata;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.UnmodifiableIterator;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
@@ -40,6 +42,7 @@ import org.skife.jdbi.v2.ResultIterator;
import javax.annotation.Nullable;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
@@ -56,6 +59,13 @@ public class SqlSegmentsMetadataQuery
{
private static final Logger log = new Logger(SqlSegmentsMetadataQuery.class);
+ /**
+ * Maximum number of intervals to consider for a batch.
+ * This is similar to {@link
IndexerSQLMetadataStorageCoordinator#MAX_NUM_SEGMENTS_TO_ANNOUNCE_AT_ONCE}, but
imposed
+ * on the intervals size.
+ */
+ private static final int MAX_INTERVALS_PER_BATCH = 100;
+
private final Handle handle;
private final SQLMetadataConnector connector;
private final MetadataStorageTablesConfig dbTables;
@@ -344,6 +354,42 @@ public class SqlSegmentsMetadataQuery
final boolean used,
@Nullable final Integer limit
)
+ {
+ if (intervals.isEmpty()) {
+ return CloseableIterators.withEmptyBaggage(
+ retrieveSegmentsInIntervalsBatch(dataSource, intervals, matchMode,
used, limit)
+ );
+ } else {
+ final List<List<Interval>> intervalsLists = Lists.partition(new
ArrayList<>(intervals), MAX_INTERVALS_PER_BATCH);
+ final List<Iterator<DataSegment>> resultingIterators = new ArrayList<>();
+ Integer limitPerBatch = limit;
+
+ for (final List<Interval> intervalList : intervalsLists) {
+ final UnmodifiableIterator<DataSegment> iterator =
retrieveSegmentsInIntervalsBatch(dataSource, intervalList, matchMode, used,
limitPerBatch);
+ if (limitPerBatch != null) {
+ // If limit is provided, we need to shrink the limit for subsequent
batches or circuit break if
+ // we have reached what was requested for.
+ final List<DataSegment> dataSegments =
ImmutableList.copyOf(iterator);
+ resultingIterators.add(dataSegments.iterator());
+ if (dataSegments.size() >= limitPerBatch) {
+ break;
+ }
+ limitPerBatch -= dataSegments.size();
+ } else {
+ resultingIterators.add(iterator);
+ }
+ }
+ return
CloseableIterators.withEmptyBaggage(Iterators.concat(resultingIterators.iterator()));
+ }
+ }
+
+ private UnmodifiableIterator<DataSegment> retrieveSegmentsInIntervalsBatch(
+ final String dataSource,
+ final Collection<Interval> intervals,
+ final IntervalMode matchMode,
+ final boolean used,
+ @Nullable final Integer limit
+ )
{
// Check if the intervals all support comparing as strings. If so, bake
them into the SQL.
final boolean compareAsString =
intervals.stream().allMatch(Intervals::canCompareEndpointsAsStrings);
@@ -372,27 +418,24 @@ public class SqlSegmentsMetadataQuery
sql.map((index, r, ctx) -> JacksonUtils.readValue(jsonMapper,
r.getBytes(1), DataSegment.class))
.iterator();
- return CloseableIterators.wrap(
- Iterators.filter(
- resultIterator,
- dataSegment -> {
- if (intervals.isEmpty()) {
+ return Iterators.filter(
+ resultIterator,
+ dataSegment -> {
+ if (intervals.isEmpty()) {
+ return true;
+ } else {
+ // Must re-check that the interval matches, even if comparing as
string, because the *segment interval*
+ // might not be string-comparable. (Consider a query interval like
"2000-01-01/3000-01-01" and a
+ // segment interval like "20010/20011".)
+ for (Interval interval : intervals) {
+ if (matchMode.apply(interval, dataSegment.getInterval())) {
return true;
- } else {
- // Must re-check that the interval matches, even if comparing
as string, because the *segment interval*
- // might not be string-comparable. (Consider a query interval
like "2000-01-01/3000-01-01" and a
- // segment interval like "20010/20011".)
- for (Interval interval : intervals) {
- if (matchMode.apply(interval, dataSegment.getInterval())) {
- return true;
- }
- }
-
- return false;
}
}
- ),
- resultIterator
+
+ return false;
+ }
+ }
);
}
diff --git
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index a8fc9e923c5..888c1006717 100644
---
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -38,6 +38,7 @@ import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.DataSegment;
@@ -69,6 +70,7 @@ import org.skife.jdbi.v2.util.StringMapper;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -1062,6 +1064,213 @@ public class IndexerSQLMetadataStorageCoordinatorTest
).containsOnlyOnce(defaultSegment3);
}
+ @Test
+ public void testRetrieveUsedSegmentsUsingMultipleIntervals() throws
IOException
+ {
+ final List<DataSegment> segments = createAndGetUsedYearSegments(1900,
2133);
+ final List<Interval> intervals =
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList());
+
+ final Collection<DataSegment> actualUsedSegments =
coordinator.retrieveUsedSegmentsForIntervals(
+ DS.WIKI,
+ intervals,
+ Segments.ONLY_VISIBLE
+ );
+
+ Assert.assertEquals(segments.size(), actualUsedSegments.size());
+ Assert.assertTrue(actualUsedSegments.containsAll(segments));
+ }
+
+ @Test
+ public void testRetrieveAllUsedSegmentsUsingIntervalsOutOfRange() throws
IOException
+ {
+ final List<DataSegment> segments = createAndGetUsedYearSegments(1905,
1910);
+
+ final Interval outOfRangeInterval = Intervals.of("1700/1800");
+ Assert.assertTrue(segments.stream()
+ .anyMatch(segment ->
!segment.getInterval().overlaps(outOfRangeInterval)));
+
+ final Collection<DataSegment> actualUsedSegments =
coordinator.retrieveUsedSegmentsForIntervals(
+ DS.WIKI,
+ ImmutableList.of(outOfRangeInterval),
+ Segments.ONLY_VISIBLE
+ );
+
+ Assert.assertEquals(0, actualUsedSegments.size());
+ }
+
+ @Test
+ public void testRetrieveAllUsedSegmentsUsingNoIntervals() throws IOException
+ {
+ final List<DataSegment> segments = createAndGetUsedYearSegments(1900,
2133);
+
+ final Collection<DataSegment> actualUsedSegments =
coordinator.retrieveAllUsedSegments(
+ DS.WIKI,
+ Segments.ONLY_VISIBLE
+ );
+
+ Assert.assertEquals(segments.size(), actualUsedSegments.size());
+ Assert.assertTrue(actualUsedSegments.containsAll(segments));
+ }
+
+ @Test
+ public void testRetrieveUnusedSegmentsUsingSingleIntervalAndNoLimit() throws
IOException
+ {
+ final List<DataSegment> segments = createAndGetUsedYearSegments(1900,
2133);
+ markAllSegmentsUnused(new HashSet<>(segments));
+
+ final List<DataSegment> actualUnusedSegments =
coordinator.retrieveUnusedSegmentsForInterval(
+ DS.WIKI,
+ Intervals.of("1900/3000"),
+ null
+ );
+
+ Assert.assertEquals(segments.size(), actualUnusedSegments.size());
+ Assert.assertTrue(actualUnusedSegments.containsAll(segments));
+ }
+
+ @Test
+ public void testRetrieveUnusedSegmentsUsingSingleIntervalAndLimitAtRange()
throws IOException
+ {
+ final List<DataSegment> segments = createAndGetUsedYearSegments(1900,
2133);
+ markAllSegmentsUnused(new HashSet<>(segments));
+
+ final int requestedLimit = segments.size();
+ final List<DataSegment> actualUnusedSegments =
coordinator.retrieveUnusedSegmentsForInterval(
+ DS.WIKI,
+ Intervals.of("1900/3000"),
+ requestedLimit
+ );
+
+ Assert.assertEquals(requestedLimit, actualUnusedSegments.size());
+ Assert.assertTrue(actualUnusedSegments.containsAll(segments));
+ }
+
+ @Test
+ public void testRetrieveUnusedSegmentsUsingSingleIntervalAndLimitInRange()
throws IOException
+ {
+ final List<DataSegment> segments = createAndGetUsedYearSegments(1900,
2133);
+ markAllSegmentsUnused(new HashSet<>(segments));
+
+ final int requestedLimit = segments.size() - 1;
+ final List<DataSegment> actualUnusedSegments =
coordinator.retrieveUnusedSegmentsForInterval(
+ DS.WIKI,
+ Intervals.of("1900/3000"),
+ requestedLimit
+ );
+
+ Assert.assertEquals(requestedLimit, actualUnusedSegments.size());
+
Assert.assertTrue(actualUnusedSegments.containsAll(segments.stream().limit(requestedLimit).collect(Collectors.toList())));
+ }
+
+ @Test
+ public void
testRetrieveUnusedSegmentsUsingSingleIntervalAndLimitOutOfRange() throws
IOException
+ {
+ final List<DataSegment> segments = createAndGetUsedYearSegments(1900,
2133);
+ markAllSegmentsUnused(new HashSet<>(segments));
+
+ final int limit = segments.size() + 1;
+ final List<DataSegment> actualUnusedSegments =
coordinator.retrieveUnusedSegmentsForInterval(
+ DS.WIKI,
+ Intervals.of("1900/3000"),
+ limit
+ );
+ Assert.assertEquals(segments.size(), actualUnusedSegments.size());
+ Assert.assertTrue(actualUnusedSegments.containsAll(segments));
+ }
+
+ @Test
+ public void testRetrieveUnusedSegmentsUsingSingleIntervalOutOfRange() throws
IOException
+ {
+ final List<DataSegment> segments = createAndGetUsedYearSegments(1905,
1910);
+ markAllSegmentsUnused(new HashSet<>(segments));
+
+ final Interval outOfRangeInterval = Intervals.of("1700/1800");
+ Assert.assertTrue(segments.stream()
+ .anyMatch(segment ->
!segment.getInterval().overlaps(outOfRangeInterval)));
+ final int limit = segments.size() + 1;
+
+ final List<DataSegment> actualUnusedSegments =
coordinator.retrieveUnusedSegmentsForInterval(
+ DS.WIKI,
+ outOfRangeInterval,
+ limit
+ );
+ Assert.assertEquals(0, actualUnusedSegments.size());
+ }
+
+ @Test
+ public void testRetrieveUnusedSegmentsUsingMultipleIntervalsAndNoLimit()
throws IOException
+ {
+ final List<DataSegment> segments = createAndGetUsedYearSegments(1900,
2133);
+ markAllSegmentsUnused(new HashSet<>(segments));
+
+ final ImmutableList<DataSegment> actualUnusedSegments =
retrieveUnusedSegmentsUsingMultipleIntervalsAndLimit(
+
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
+ null
+ );
+ Assert.assertEquals(segments.size(), actualUnusedSegments.size());
+ Assert.assertTrue(segments.containsAll(actualUnusedSegments));
+ }
+
+ @Test
+ public void
testRetrieveUnusedSegmentsUsingMultipleIntervalsAndLimitAtRange() throws
IOException
+ {
+ final List<DataSegment> segments = createAndGetUsedYearSegments(1900,
2133);
+ markAllSegmentsUnused(new HashSet<>(segments));
+
+ final ImmutableList<DataSegment> actualUnusedSegments =
retrieveUnusedSegmentsUsingMultipleIntervalsAndLimit(
+
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
+ segments.size()
+ );
+ Assert.assertEquals(segments.size(), actualUnusedSegments.size());
+ Assert.assertTrue(segments.containsAll(actualUnusedSegments));
+ }
+
+ @Test
+ public void
testRetrieveUnusedSegmentsUsingMultipleIntervalsAndLimitInRange() throws
IOException
+ {
+ final List<DataSegment> segments = createAndGetUsedYearSegments(1900,
2133);
+ markAllSegmentsUnused(new HashSet<>(segments));
+
+ final int requestedLimit = segments.size() - 1;
+ final ImmutableList<DataSegment> actualUnusedSegments =
retrieveUnusedSegmentsUsingMultipleIntervalsAndLimit(
+
segments.stream().limit(requestedLimit).map(DataSegment::getInterval).collect(Collectors.toList()),
+ requestedLimit
+ );
+ Assert.assertEquals(requestedLimit, actualUnusedSegments.size());
+
Assert.assertTrue(actualUnusedSegments.containsAll(segments.stream().limit(requestedLimit).collect(Collectors.toList())));
+ }
+
+ @Test
+ public void
testRetrieveUnusedSegmentsUsingMultipleIntervalsAndLimitOutOfRange() throws
IOException
+ {
+ final List<DataSegment> segments = createAndGetUsedYearSegments(1900,
2133);
+ markAllSegmentsUnused(new HashSet<>(segments));
+
+ final ImmutableList<DataSegment> actualUnusedSegments =
retrieveUnusedSegmentsUsingMultipleIntervalsAndLimit(
+
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
+ segments.size() + 1
+ );
+ Assert.assertEquals(segments.size(), actualUnusedSegments.size());
+ Assert.assertTrue(actualUnusedSegments.containsAll(segments));
+ }
+
+ @Test
+ public void testRetrieveUnusedSegmentsUsingIntervalOutOfRange() throws
IOException
+ {
+ final List<DataSegment> segments = createAndGetUsedYearSegments(1905,
1910);
+ markAllSegmentsUnused(new HashSet<>(segments));
+
+ final Interval outOfRangeInterval = Intervals.of("1700/1800");
+ Assert.assertTrue(segments.stream()
+ .anyMatch(segment ->
!segment.getInterval().overlaps(outOfRangeInterval)));
+
+ final ImmutableList<DataSegment> actualUnusedSegments =
retrieveUnusedSegmentsUsingMultipleIntervalsAndLimit(
+ ImmutableList.of(outOfRangeInterval),
+ null
+ );
+ Assert.assertEquals(0, actualUnusedSegments.size());
+ }
+
@Test
public void testSimpleUnusedList() throws IOException
{
@@ -2713,4 +2922,43 @@ public class IndexerSQLMetadataStorageCoordinatorTest
.size(100)
.build();
}
+
+ private List<DataSegment> createAndGetUsedYearSegments(final int startYear,
final int endYear) throws IOException
+ {
+ final List<DataSegment> segments = new ArrayList<>();
+
+ for (int year = startYear; year < endYear; year++) {
+ segments.add(createSegment(
+ Intervals.of("%d/%d", year, year + 1),
+ "version",
+ new LinearShardSpec(0))
+ );
+ }
+ final Set<DataSegment> segmentsSet = new HashSet<>(segments);
+ final Set<DataSegment> committedSegments =
coordinator.commitSegments(segmentsSet);
+ Assert.assertTrue(committedSegments.containsAll(new HashSet<>(segments)));
+
+ return segments;
+ }
+
+ private ImmutableList<DataSegment>
retrieveUnusedSegmentsUsingMultipleIntervalsAndLimit(
+ final List<Interval> intervals,
+ final Integer limit
+ )
+ {
+ return derbyConnector.inReadOnlyTransaction(
+ (handle, status) -> {
+ try (final CloseableIterator<DataSegment> iterator =
+ SqlSegmentsMetadataQuery.forHandle(
+ handle,
+ derbyConnector,
+
derbyConnectorRule.metadataTablesConfigSupplier().get(),
+ mapper
+ )
+ .retrieveUnusedSegments(DS.WIKI,
intervals, limit)) {
+ return ImmutableList.copyOf(iterator);
+ }
+ }
+ );
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]