This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a64f72c679 Lookup on incomplete partition set in 
SegmentMetadataQuerySegmentWalker (#15496)
6a64f72c679 is described below

commit 6a64f72c6798a6939106c5b39c5da6838e7240fb
Author: Rishabh Singh <[email protected]>
AuthorDate: Wed Dec 6 15:25:28 2023 +0530

    Lookup on incomplete partition set in SegmentMetadataQuerySegmentWalker 
(#15496)
    
    Description
    With CentralizedDatasourceSchema (#14989) feature enabled, metadata for 
appended segments was not being refreshed. This caused numRows to be 0 for the 
new segments and would probably cause the datasource schema to not include 
columns from the new segments.
    
    Analysis
    The problem turned out in the new QuerySegmentWalker implementation in the 
Coordinator. It first finds the segment to be queried in the Coordinator 
timeline. Then it creates a new timeline of the segments present in the 
timeline.
    The problem was that it is looking up complete partition set in the new 
timeline. Since the appended segments by themselves do not make a complete 
partition set, no SegmentMetadataQuery were executed.
---
 .../SegmentMetadataQuerySegmentWalker.java         |   2 +-
 .../SegmentMetadataQuerySegmentWalkerTest.java     | 116 +++++++++++++++++++--
 2 files changed, 108 insertions(+), 10 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/segment/metadata/SegmentMetadataQuerySegmentWalker.java
 
b/server/src/main/java/org/apache/druid/segment/metadata/SegmentMetadataQuerySegmentWalker.java
index 3de63c35e3a..00a015eaea8 100644
--- 
a/server/src/main/java/org/apache/druid/segment/metadata/SegmentMetadataQuerySegmentWalker.java
+++ 
b/server/src/main/java/org/apache/druid/segment/metadata/SegmentMetadataQuerySegmentWalker.java
@@ -201,7 +201,7 @@ public class SegmentMetadataQuerySegmentWalker implements 
QuerySegmentWalker
   )
   {
     final Function<Interval, List<TimelineObjectHolder<String, 
SegmentLoadInfo>>> lookupFn
-        = timeline::lookup;
+        = timeline::lookupWithIncompletePartitions;
 
     final List<Interval> intervals = query.getIntervals();
     List<TimelineObjectHolder<String, SegmentLoadInfo>> timelineObjectHolders =
diff --git 
a/server/src/test/java/org/apache/druid/segment/metadata/SegmentMetadataQuerySegmentWalkerTest.java
 
b/server/src/test/java/org/apache/druid/segment/metadata/SegmentMetadataQuerySegmentWalkerTest.java
index d51454f0a5c..7392726b577 100644
--- 
a/server/src/test/java/org/apache/druid/segment/metadata/SegmentMetadataQuerySegmentWalkerTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/metadata/SegmentMetadataQuerySegmentWalkerTest.java
@@ -134,8 +134,8 @@ public class SegmentMetadataQuerySegmentWalkerTest
             timelines,
             queryRunnerMap,
             Lists.newArrayList(
-                Pair.of(Intervals.of("2011-01-01/2011-01-02"), 5),
-                Pair.of(Intervals.of("2011-01-05/2011-01-07"), 1))
+                Pair.of(Intervals.of("2011-01-01/2011-01-02"), 
Lists.newArrayList(0, 4, 5)),
+                Pair.of(Intervals.of("2011-01-05/2011-01-07"), 
Lists.newArrayList(0, 1, 1)))
         );
 
     List<SegmentDescriptor> segmentDescriptors =
@@ -148,7 +148,7 @@ public class SegmentMetadataQuerySegmentWalkerTest
                              .collect(
                                  Collectors.toList());
 
-    final SegmentMetadataQuery segmentMetadataQuery = new SegmentMetadataQuery(
+    SegmentMetadataQuery segmentMetadataQuery = new SegmentMetadataQuery(
         new TableDataSource(DATASOURCE),
         new MultipleSpecificSegmentSpec(
             segmentDescriptors
@@ -206,10 +206,104 @@ public class SegmentMetadataQuerySegmentWalkerTest
     Assert.assertEquals(expectedSegmentIds, actualSegmentIds);
   }
 
+  @Test
+  public void testQueryAppendedSegments() throws IOException
+  {
+    Map<DataSource, VersionedIntervalTimeline<String, SegmentLoadInfo>> 
timelines = new HashMap<>();
+    Map<String, QueryRunner> queryRunnerMap = new HashMap<>();
+
+    // populate the core partition set
+    populateTimeline(
+        timelines,
+        queryRunnerMap,
+        Collections.singletonList(
+            Pair.of(Intervals.of("2011-01-01/2011-01-02"), 
Lists.newArrayList(0, 4, 5))
+        )
+    );
+
+    queryRunnerMap.clear();
+
+    // append 2 new segments
+    Map<String, ServerExpectations> serverExpectationsMap =
+        populateTimeline(
+            timelines,
+            queryRunnerMap,
+            Collections.singletonList(
+                Pair.of(Intervals.of("2011-01-01/2011-01-02"), 
Lists.newArrayList(5, 6, 5))
+            )
+        );
+
+    List<SegmentDescriptor> segmentDescriptors =
+        serverExpectationsMap.values()
+                             .stream()
+                             .flatMap(serverExpectations -> 
Lists.newArrayList(serverExpectations.iterator()).stream())
+                             .map(
+                                 ServerExpectation::getSegment)
+                             .map(segment -> segment.getId().toDescriptor())
+                             .collect(
+                                 Collectors.toList());
+
+    SegmentMetadataQuery segmentMetadataQuery = new SegmentMetadataQuery(
+        new TableDataSource(DATASOURCE),
+        new MultipleSpecificSegmentSpec(
+            segmentDescriptors
+        ),
+        new AllColumnIncluderator(),
+        false,
+        QueryContexts.override(
+            Collections.emptyMap(),
+            QueryContexts.BROKER_PARALLEL_MERGE_KEY,
+            false
+        ),
+        EnumSet.noneOf(SegmentMetadataQuery.AnalysisType.class),
+        false,
+        null,
+        null
+    );
+
+    SegmentMetadataQuerySegmentWalker walker = new 
SegmentMetadataQuerySegmentWalker(
+        new TestCoordinatorServerView(timelines, queryRunnerMap),
+        httpClientConfig,
+        warehouse,
+        new ServerConfig(),
+        new NoopServiceEmitter()
+    );
+
+    Sequence<SegmentAnalysis> resultSequence = 
walker.getQueryRunnerForSegments(
+        segmentMetadataQuery,
+        segmentDescriptors
+    ).run(QueryPlus.wrap(segmentMetadataQuery));
+
+    Yielder<SegmentAnalysis> yielder = Yielders.each(resultSequence);
+    Set<String> actualSegmentIds = new HashSet<>();
+    try {
+      while (!yielder.isDone()) {
+        final SegmentAnalysis analysis = yielder.get();
+        actualSegmentIds.add(analysis.getId());
+        yielder = yielder.next(null);
+      }
+    }
+    finally {
+      yielder.close();
+    }
+
+    Set<String> expectedSegmentIds =
+        serverExpectationsMap.values()
+                             .stream()
+                             .flatMap(serverExpectations -> Lists.newArrayList(
+                                 serverExpectations.iterator()).stream())
+                             .map(
+                                 ServerExpectation::getSegment)
+                             .map(segment -> segment.getId().toString())
+                             .collect(
+                                 Collectors.toSet());
+    Assert.assertEquals(expectedSegmentIds, actualSegmentIds);
+  }
+
   private Map<String, ServerExpectations> populateTimeline(
       final Map<DataSource, VersionedIntervalTimeline<String, 
SegmentLoadInfo>> timelines,
       final Map<String, QueryRunner> queryRunnerMap,
-      final List<Pair<Interval, Integer>> intervalAndChunks
+      final List<Pair<Interval, List<Integer>>> intervalAndChunks
   )
   {
     VersionedIntervalTimeline<String, SegmentLoadInfo> timeline = new 
VersionedIntervalTimeline<>(Comparator.naturalOrder());
@@ -217,17 +311,21 @@ public class SegmentMetadataQuerySegmentWalkerTest
 
     Map<String, ServerExpectations> serverExpectationsMap = new HashMap<>();
 
-    for (Pair<Interval, Integer> intervalAndChunk : intervalAndChunks) {
-      for (int partitionNum = 0; partitionNum < intervalAndChunk.rhs; 
partitionNum++) {
+    for (Pair<Interval, List<Integer>> intervalAndChunk : intervalAndChunks) {
+      List<Integer> partitionDetails = intervalAndChunk.rhs;
+      int startNum = partitionDetails.get(0);
+      int endNum = partitionDetails.get(1);
+      int corePartitions = partitionDetails.get(2);
+      for (int partitionNum = startNum; partitionNum <= endNum; 
partitionNum++) {
 
         Interval interval = intervalAndChunk.lhs;
-        int numChunks = intervalAndChunk.rhs;
+        int numChunks = endNum - startNum + 1;
         SegmentId segmentId = SegmentId.of(DATASOURCE, interval, "0", 
partitionNum);
 
         DataSegment mockSegment = EasyMock.mock(DataSegment.class);
 
         final ShardSpec shardSpec;
-        if (numChunks == 1) {
+        if (corePartitions == 1) {
           shardSpec = new SingleDimensionShardSpec("dimAll", null, null, 0, 1);
         } else {
           String start = null;
@@ -238,7 +336,7 @@ public class SegmentMetadataQuerySegmentWalkerTest
           if (partitionNum + 1 < numChunks) {
             end = String.valueOf(partitionNum + 1);
           }
-          shardSpec = new SingleDimensionShardSpec("dim", start, end, 
partitionNum, numChunks);
+          shardSpec = new SingleDimensionShardSpec("dim", start, end, 
partitionNum, corePartitions);
         }
 
         ServerExpectation<Object> expectation = new ServerExpectation<>(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to