This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch 0.19.0
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/0.19.0 by this push:
     new ce61f49  Fix CachingClusteredClient when querying specific segments 
(#10125) (#10132)
ce61f49 is described below

commit ce61f49cc55cd775cfd359d871253e1110daec7e
Author: Jihoon Son <[email protected]>
AuthorDate: Thu Jul 2 19:46:05 2020 -0700

    Fix CachingClusteredClient when querying specific segments (#10125) (#10132)
    
    * Fix CachingClusteredClient when querying specific segments
    
    * delete useless test
    
    * roll back timeout
---
 .../druid/client/CachingClusteredClient.java       | 27 +++++++++----
 .../org/apache/druid/client/TestHttpClient.java    |  2 +-
 .../apache/druid/query/RetryQueryRunnerTest.java   | 45 ++++++++++++++++------
 3 files changed, 54 insertions(+), 20 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java 
b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
index 2922f2f..8dba7de 100644
--- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
+++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
@@ -175,7 +175,7 @@ public class CachingClusteredClient implements 
QuerySegmentWalker
       @Override
       public Sequence<T> run(final QueryPlus<T> queryPlus, final 
ResponseContext responseContext)
       {
-        return CachingClusteredClient.this.run(queryPlus, responseContext, 
timeline -> timeline);
+        return CachingClusteredClient.this.run(queryPlus, responseContext, 
timeline -> timeline, false);
       }
     };
   }
@@ -187,10 +187,12 @@ public class CachingClusteredClient implements 
QuerySegmentWalker
   private <T> Sequence<T> run(
       final QueryPlus<T> queryPlus,
       final ResponseContext responseContext,
-      final UnaryOperator<TimelineLookup<String, ServerSelector>> 
timelineConverter
+      final UnaryOperator<TimelineLookup<String, ServerSelector>> 
timelineConverter,
+      final boolean specificSegments
   )
   {
-    final ClusterQueryResult<T> result = new 
SpecificQueryRunnable<>(queryPlus, responseContext).run(timelineConverter);
+    final ClusterQueryResult<T> result = new 
SpecificQueryRunnable<>(queryPlus, responseContext)
+        .run(timelineConverter, specificSegments);
     initializeNumRemainingResponsesInResponseContext(queryPlus.getQuery(), 
responseContext, result.numQueryServers);
     return result.sequence;
   }
@@ -231,7 +233,8 @@ public class CachingClusteredClient implements 
QuerySegmentWalker
                 }
               }
               return timeline2;
-            }
+            },
+            true
         );
       }
     };
@@ -321,7 +324,10 @@ public class CachingClusteredClient implements 
QuerySegmentWalker
      * @return a pair of a sequence merging results from remote query servers 
and the number of remote servers
      *         participating in query processing.
      */
-    ClusterQueryResult<T> run(final UnaryOperator<TimelineLookup<String, 
ServerSelector>> timelineConverter)
+    ClusterQueryResult<T> run(
+        final UnaryOperator<TimelineLookup<String, ServerSelector>> 
timelineConverter,
+        final boolean specificSegments
+    )
     {
       final Optional<? extends TimelineLookup<String, ServerSelector>> 
maybeTimeline = serverView.getTimeline(
           dataSourceAnalysis
@@ -335,7 +341,7 @@ public class CachingClusteredClient implements 
QuerySegmentWalker
         computeUncoveredIntervals(timeline);
       }
 
-      final Set<SegmentServerSelector> segmentServers = 
computeSegmentsToQuery(timeline);
+      final Set<SegmentServerSelector> segmentServers = 
computeSegmentsToQuery(timeline, specificSegments);
       @Nullable
       final byte[] queryCacheKey = computeQueryCacheKey();
       if (query.getContext().get(QueryResource.HEADER_IF_NONE_MATCH) != null) {
@@ -401,11 +407,16 @@ public class CachingClusteredClient implements 
QuerySegmentWalker
       }
     }
 
-    private Set<SegmentServerSelector> 
computeSegmentsToQuery(TimelineLookup<String, ServerSelector> timeline)
+    private Set<SegmentServerSelector> computeSegmentsToQuery(
+        TimelineLookup<String, ServerSelector> timeline,
+        boolean specificSegments
+    )
     {
+      final java.util.function.Function<Interval, 
List<TimelineObjectHolder<String, ServerSelector>>> lookupFn
+          = specificSegments ? timeline::lookupWithIncompletePartitions : 
timeline::lookup;
       final List<TimelineObjectHolder<String, ServerSelector>> serversLookup = 
toolChest.filterSegments(
           query,
-          intervals.stream().flatMap(i -> 
timeline.lookup(i).stream()).collect(Collectors.toList())
+          intervals.stream().flatMap(i -> 
lookupFn.apply(i).stream()).collect(Collectors.toList())
       );
 
       final Set<SegmentServerSelector> segments = new LinkedHashSet<>();
diff --git a/server/src/test/java/org/apache/druid/client/TestHttpClient.java 
b/server/src/test/java/org/apache/druid/client/TestHttpClient.java
index 7828b64..a899cb0 100644
--- a/server/src/test/java/org/apache/druid/client/TestHttpClient.java
+++ b/server/src/test/java/org/apache/druid/client/TestHttpClient.java
@@ -171,7 +171,7 @@ public class TestHttpClient implements HttpClient
     private QueryRunner getQueryRunner()
     {
       if (isSegmentDropped) {
-        return new ReportTimelineMissingSegmentQueryRunner(
+        return new ReportTimelineMissingSegmentQueryRunner<>(
             new SegmentDescriptor(segment.getInterval(), segment.getVersion(), 
segment.getId().getPartitionNum())
         );
       } else {
diff --git 
a/server/src/test/java/org/apache/druid/query/RetryQueryRunnerTest.java 
b/server/src/test/java/org/apache/druid/query/RetryQueryRunnerTest.java
index 7ea13f2..3cea53b 100644
--- a/server/src/test/java/org/apache/druid/query/RetryQueryRunnerTest.java
+++ b/server/src/test/java/org/apache/druid/query/RetryQueryRunnerTest.java
@@ -20,6 +20,7 @@
 package org.apache.druid.query;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.client.CachingClusteredClient;
@@ -35,7 +36,9 @@ import org.apache.druid.client.cache.MapCache;
 import org.apache.druid.guice.http.DruidHttpClientConfig;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.NonnullPair;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.java.util.common.io.Closer;
@@ -74,7 +77,7 @@ public class RetryQueryRunnerTest
 {
   private static final Closer CLOSER = Closer.create();
   private static final String DATASOURCE = "datasource";
-  private static final GeneratorSchemaInfo SCHEMA_INFO = 
GeneratorBasicSchemas.SCHEMA_MAP.get("basic");
+  private static final GeneratorSchemaInfo BASE_SCHEMA_INFO = 
GeneratorBasicSchemas.SCHEMA_MAP.get("basic");
   private static final boolean USE_PARALLEL_MERGE_POOL_CONFIGURED = false;
 
   @Rule
@@ -148,7 +151,7 @@ public class RetryQueryRunnerTest
   public void testNoRetry()
   {
     prepareCluster(10);
-    final Query<Result<TimeseriesResultValue>> query = 
timeseriesQuery(SCHEMA_INFO.getDataInterval());
+    final Query<Result<TimeseriesResultValue>> query = 
timeseriesQuery(BASE_SCHEMA_INFO.getDataInterval());
     final RetryQueryRunner<Result<TimeseriesResultValue>> queryRunner = 
createQueryRunner(
         newRetryQueryRunnerConfig(1, false),
         query,
@@ -165,7 +168,7 @@ public class RetryQueryRunnerTest
   public void testRetryForMovedSegment()
   {
     prepareCluster(10);
-    final Query<Result<TimeseriesResultValue>> query = 
timeseriesQuery(SCHEMA_INFO.getDataInterval());
+    final Query<Result<TimeseriesResultValue>> query = 
timeseriesQuery(BASE_SCHEMA_INFO.getDataInterval());
     final RetryQueryRunner<Result<TimeseriesResultValue>> queryRunner = 
createQueryRunner(
         newRetryQueryRunnerConfig(1, true),
         query,
@@ -189,7 +192,7 @@ public class RetryQueryRunnerTest
   public void testRetryUntilWeGetFullResult()
   {
     prepareCluster(10);
-    final Query<Result<TimeseriesResultValue>> query = 
timeseriesQuery(SCHEMA_INFO.getDataInterval());
+    final Query<Result<TimeseriesResultValue>> query = 
timeseriesQuery(BASE_SCHEMA_INFO.getDataInterval());
     final RetryQueryRunner<Result<TimeseriesResultValue>> queryRunner = 
createQueryRunner(
         newRetryQueryRunnerConfig(100, false), // retry up to 100
         query,
@@ -209,7 +212,7 @@ public class RetryQueryRunnerTest
   public void testFailWithPartialResultsAfterRetry()
   {
     prepareCluster(10);
-    final Query<Result<TimeseriesResultValue>> query = 
timeseriesQuery(SCHEMA_INFO.getDataInterval());
+    final Query<Result<TimeseriesResultValue>> query = 
timeseriesQuery(BASE_SCHEMA_INFO.getDataInterval());
     final RetryQueryRunner<Result<TimeseriesResultValue>> queryRunner = 
createQueryRunner(
         newRetryQueryRunnerConfig(1, false),
         query,
@@ -229,12 +232,26 @@ public class RetryQueryRunnerTest
 
   private void prepareCluster(int numServers)
   {
+    Preconditions.checkArgument(numServers < 25, "Cannot be larger than 24");
     for (int i = 0; i < numServers; i++) {
-      final DataSegment segment = newSegment(SCHEMA_INFO.getDataInterval(), i);
+      final int partitionId = i % 2;
+      final int intervalIndex = i / 2;
+      final Interval interval = Intervals.of("2000-01-01T%02d/PT1H", 
intervalIndex);
+      final DataSegment segment = newSegment(interval, partitionId, 2);
       addServer(
           SimpleServerView.createServer(i + 1),
           segment,
-          segmentGenerator.generate(segment, SCHEMA_INFO, Granularities.NONE, 
10)
+          segmentGenerator.generate(
+              segment,
+              new GeneratorSchemaInfo(
+                  BASE_SCHEMA_INFO.getColumnSchemas(),
+                  BASE_SCHEMA_INFO.getAggs(),
+                  interval,
+                  BASE_SCHEMA_INFO.isWithRollup()
+              ),
+              Granularities.NONE,
+              10
+          )
       );
     }
   }
@@ -315,7 +332,7 @@ public class RetryQueryRunnerTest
     return Druids.newTimeseriesQueryBuilder()
                  .dataSource(DATASOURCE)
                  .intervals(ImmutableList.of(interval))
-                 .granularity(Granularities.DAY)
+                 .granularity(Granularities.HOUR)
                  .aggregators(new CountAggregatorFactory("rows"))
                  .context(
                      ImmutableMap.of(
@@ -338,20 +355,26 @@ public class RetryQueryRunnerTest
   {
     return IntStream
         .range(0, expectedNumResultRows)
-        .mapToObj(i -> new Result<>(DateTimes.of("2000-01-01"), new 
TimeseriesResultValue(ImmutableMap.of("rows", 10))))
+        .mapToObj(
+            i -> new Result<>(
+                DateTimes.of(StringUtils.format("2000-01-01T%02d", i / 2)),
+                new TimeseriesResultValue(ImmutableMap.of("rows", 10))
+            )
+        )
         .collect(Collectors.toList());
   }
 
   private static DataSegment newSegment(
       Interval interval,
-      int partitionId
+      int partitionId,
+      int numCorePartitions
   )
   {
     return DataSegment.builder()
                       .dataSource(DATASOURCE)
                       .interval(interval)
                       .version("1")
-                      .shardSpec(new NumberedShardSpec(partitionId, 0))
+                      .shardSpec(new NumberedShardSpec(partitionId, 
numCorePartitions))
                       .size(10)
                       .build();
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to