This is an automated email from the ASF dual-hosted git repository.
abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 6a64f72c679 Lookup on incomplete partition set in
SegmentMetadataQuerySegmentWalker (#15496)
6a64f72c679 is described below
commit 6a64f72c6798a6939106c5b39c5da6838e7240fb
Author: Rishabh Singh <[email protected]>
AuthorDate: Wed Dec 6 15:25:28 2023 +0530
Lookup on incomplete partition set in SegmentMetadataQuerySegmentWalker
(#15496)
Description
With CentralizedDatasourceSchema (#14989) feature enabled, metadata for
appended segments was not being refreshed. This caused numRows to be 0 for the
new segments and would probably cause the datasource schema to not include
columns from the new segments.
Analysis
The problem turned out in the new QuerySegmentWalker implementation in the
Coordinator. It first finds the segment to be queried in the Coordinator
timeline. Then it creates a new timeline of the segments present in the
timeline.
The problem was that it is looking up complete partition set in the new
timeline. Since the appended segments by themselves do not make a complete
partition set, no SegmentMetadataQuery were executed.
---
.../SegmentMetadataQuerySegmentWalker.java | 2 +-
.../SegmentMetadataQuerySegmentWalkerTest.java | 116 +++++++++++++++++++--
2 files changed, 108 insertions(+), 10 deletions(-)
diff --git
a/server/src/main/java/org/apache/druid/segment/metadata/SegmentMetadataQuerySegmentWalker.java
b/server/src/main/java/org/apache/druid/segment/metadata/SegmentMetadataQuerySegmentWalker.java
index 3de63c35e3a..00a015eaea8 100644
---
a/server/src/main/java/org/apache/druid/segment/metadata/SegmentMetadataQuerySegmentWalker.java
+++
b/server/src/main/java/org/apache/druid/segment/metadata/SegmentMetadataQuerySegmentWalker.java
@@ -201,7 +201,7 @@ public class SegmentMetadataQuerySegmentWalker implements
QuerySegmentWalker
)
{
final Function<Interval, List<TimelineObjectHolder<String,
SegmentLoadInfo>>> lookupFn
- = timeline::lookup;
+ = timeline::lookupWithIncompletePartitions;
final List<Interval> intervals = query.getIntervals();
List<TimelineObjectHolder<String, SegmentLoadInfo>> timelineObjectHolders =
diff --git
a/server/src/test/java/org/apache/druid/segment/metadata/SegmentMetadataQuerySegmentWalkerTest.java
b/server/src/test/java/org/apache/druid/segment/metadata/SegmentMetadataQuerySegmentWalkerTest.java
index d51454f0a5c..7392726b577 100644
---
a/server/src/test/java/org/apache/druid/segment/metadata/SegmentMetadataQuerySegmentWalkerTest.java
+++
b/server/src/test/java/org/apache/druid/segment/metadata/SegmentMetadataQuerySegmentWalkerTest.java
@@ -134,8 +134,8 @@ public class SegmentMetadataQuerySegmentWalkerTest
timelines,
queryRunnerMap,
Lists.newArrayList(
- Pair.of(Intervals.of("2011-01-01/2011-01-02"), 5),
- Pair.of(Intervals.of("2011-01-05/2011-01-07"), 1))
+ Pair.of(Intervals.of("2011-01-01/2011-01-02"),
Lists.newArrayList(0, 4, 5)),
+ Pair.of(Intervals.of("2011-01-05/2011-01-07"),
Lists.newArrayList(0, 1, 1)))
);
List<SegmentDescriptor> segmentDescriptors =
@@ -148,7 +148,7 @@ public class SegmentMetadataQuerySegmentWalkerTest
.collect(
Collectors.toList());
- final SegmentMetadataQuery segmentMetadataQuery = new SegmentMetadataQuery(
+ SegmentMetadataQuery segmentMetadataQuery = new SegmentMetadataQuery(
new TableDataSource(DATASOURCE),
new MultipleSpecificSegmentSpec(
segmentDescriptors
@@ -206,10 +206,104 @@ public class SegmentMetadataQuerySegmentWalkerTest
Assert.assertEquals(expectedSegmentIds, actualSegmentIds);
}
+ @Test
+ public void testQueryAppendedSegments() throws IOException
+ {
+ Map<DataSource, VersionedIntervalTimeline<String, SegmentLoadInfo>>
timelines = new HashMap<>();
+ Map<String, QueryRunner> queryRunnerMap = new HashMap<>();
+
+ // populate the core partition set
+ populateTimeline(
+ timelines,
+ queryRunnerMap,
+ Collections.singletonList(
+ Pair.of(Intervals.of("2011-01-01/2011-01-02"),
Lists.newArrayList(0, 4, 5))
+ )
+ );
+
+ queryRunnerMap.clear();
+
+ // append 2 new segments
+ Map<String, ServerExpectations> serverExpectationsMap =
+ populateTimeline(
+ timelines,
+ queryRunnerMap,
+ Collections.singletonList(
+ Pair.of(Intervals.of("2011-01-01/2011-01-02"),
Lists.newArrayList(5, 6, 5))
+ )
+ );
+
+ List<SegmentDescriptor> segmentDescriptors =
+ serverExpectationsMap.values()
+ .stream()
+ .flatMap(serverExpectations ->
Lists.newArrayList(serverExpectations.iterator()).stream())
+ .map(
+ ServerExpectation::getSegment)
+ .map(segment -> segment.getId().toDescriptor())
+ .collect(
+ Collectors.toList());
+
+ SegmentMetadataQuery segmentMetadataQuery = new SegmentMetadataQuery(
+ new TableDataSource(DATASOURCE),
+ new MultipleSpecificSegmentSpec(
+ segmentDescriptors
+ ),
+ new AllColumnIncluderator(),
+ false,
+ QueryContexts.override(
+ Collections.emptyMap(),
+ QueryContexts.BROKER_PARALLEL_MERGE_KEY,
+ false
+ ),
+ EnumSet.noneOf(SegmentMetadataQuery.AnalysisType.class),
+ false,
+ null,
+ null
+ );
+
+ SegmentMetadataQuerySegmentWalker walker = new
SegmentMetadataQuerySegmentWalker(
+ new TestCoordinatorServerView(timelines, queryRunnerMap),
+ httpClientConfig,
+ warehouse,
+ new ServerConfig(),
+ new NoopServiceEmitter()
+ );
+
+ Sequence<SegmentAnalysis> resultSequence =
walker.getQueryRunnerForSegments(
+ segmentMetadataQuery,
+ segmentDescriptors
+ ).run(QueryPlus.wrap(segmentMetadataQuery));
+
+ Yielder<SegmentAnalysis> yielder = Yielders.each(resultSequence);
+ Set<String> actualSegmentIds = new HashSet<>();
+ try {
+ while (!yielder.isDone()) {
+ final SegmentAnalysis analysis = yielder.get();
+ actualSegmentIds.add(analysis.getId());
+ yielder = yielder.next(null);
+ }
+ }
+ finally {
+ yielder.close();
+ }
+
+ Set<String> expectedSegmentIds =
+ serverExpectationsMap.values()
+ .stream()
+ .flatMap(serverExpectations -> Lists.newArrayList(
+ serverExpectations.iterator()).stream())
+ .map(
+ ServerExpectation::getSegment)
+ .map(segment -> segment.getId().toString())
+ .collect(
+ Collectors.toSet());
+ Assert.assertEquals(expectedSegmentIds, actualSegmentIds);
+ }
+
private Map<String, ServerExpectations> populateTimeline(
final Map<DataSource, VersionedIntervalTimeline<String,
SegmentLoadInfo>> timelines,
final Map<String, QueryRunner> queryRunnerMap,
- final List<Pair<Interval, Integer>> intervalAndChunks
+ final List<Pair<Interval, List<Integer>>> intervalAndChunks
)
{
VersionedIntervalTimeline<String, SegmentLoadInfo> timeline = new
VersionedIntervalTimeline<>(Comparator.naturalOrder());
@@ -217,17 +311,21 @@ public class SegmentMetadataQuerySegmentWalkerTest
Map<String, ServerExpectations> serverExpectationsMap = new HashMap<>();
- for (Pair<Interval, Integer> intervalAndChunk : intervalAndChunks) {
- for (int partitionNum = 0; partitionNum < intervalAndChunk.rhs;
partitionNum++) {
+ for (Pair<Interval, List<Integer>> intervalAndChunk : intervalAndChunks) {
+ List<Integer> partitionDetails = intervalAndChunk.rhs;
+ int startNum = partitionDetails.get(0);
+ int endNum = partitionDetails.get(1);
+ int corePartitions = partitionDetails.get(2);
+ for (int partitionNum = startNum; partitionNum <= endNum;
partitionNum++) {
Interval interval = intervalAndChunk.lhs;
- int numChunks = intervalAndChunk.rhs;
+ int numChunks = endNum - startNum + 1;
SegmentId segmentId = SegmentId.of(DATASOURCE, interval, "0",
partitionNum);
DataSegment mockSegment = EasyMock.mock(DataSegment.class);
final ShardSpec shardSpec;
- if (numChunks == 1) {
+ if (corePartitions == 1) {
shardSpec = new SingleDimensionShardSpec("dimAll", null, null, 0, 1);
} else {
String start = null;
@@ -238,7 +336,7 @@ public class SegmentMetadataQuerySegmentWalkerTest
if (partitionNum + 1 < numChunks) {
end = String.valueOf(partitionNum + 1);
}
- shardSpec = new SingleDimensionShardSpec("dim", start, end,
partitionNum, numChunks);
+ shardSpec = new SingleDimensionShardSpec("dim", start, end,
partitionNum, corePartitions);
}
ServerExpectation<Object> expectation = new ServerExpectation<>(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]