This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch 0.19.0
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/0.19.0 by this push:
new ce61f49 Fix CachingClusteredClient when querying specific segments
(#10125) (#10132)
ce61f49 is described below
commit ce61f49cc55cd775cfd359d871253e1110daec7e
Author: Jihoon Son <[email protected]>
AuthorDate: Thu Jul 2 19:46:05 2020 -0700
Fix CachingClusteredClient when querying specific segments (#10125) (#10132)
* Fix CachingClusteredClient when querying specific segments
* delete useless test
* roll back timeout
---
.../druid/client/CachingClusteredClient.java | 27 +++++++++----
.../org/apache/druid/client/TestHttpClient.java | 2 +-
.../apache/druid/query/RetryQueryRunnerTest.java | 45 ++++++++++++++++------
3 files changed, 54 insertions(+), 20 deletions(-)
diff --git
a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
index 2922f2f..8dba7de 100644
--- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
+++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
@@ -175,7 +175,7 @@ public class CachingClusteredClient implements
QuerySegmentWalker
@Override
public Sequence<T> run(final QueryPlus<T> queryPlus, final
ResponseContext responseContext)
{
- return CachingClusteredClient.this.run(queryPlus, responseContext,
timeline -> timeline);
+ return CachingClusteredClient.this.run(queryPlus, responseContext,
timeline -> timeline, false);
}
};
}
@@ -187,10 +187,12 @@ public class CachingClusteredClient implements
QuerySegmentWalker
private <T> Sequence<T> run(
final QueryPlus<T> queryPlus,
final ResponseContext responseContext,
- final UnaryOperator<TimelineLookup<String, ServerSelector>>
timelineConverter
+ final UnaryOperator<TimelineLookup<String, ServerSelector>>
timelineConverter,
+ final boolean specificSegments
)
{
- final ClusterQueryResult<T> result = new
SpecificQueryRunnable<>(queryPlus, responseContext).run(timelineConverter);
+ final ClusterQueryResult<T> result = new
SpecificQueryRunnable<>(queryPlus, responseContext)
+ .run(timelineConverter, specificSegments);
initializeNumRemainingResponsesInResponseContext(queryPlus.getQuery(),
responseContext, result.numQueryServers);
return result.sequence;
}
@@ -231,7 +233,8 @@ public class CachingClusteredClient implements
QuerySegmentWalker
}
}
return timeline2;
- }
+ },
+ true
);
}
};
@@ -321,7 +324,10 @@ public class CachingClusteredClient implements
QuerySegmentWalker
* @return a pair of a sequence merging results from remote query servers
and the number of remote servers
* participating in query processing.
*/
- ClusterQueryResult<T> run(final UnaryOperator<TimelineLookup<String,
ServerSelector>> timelineConverter)
+ ClusterQueryResult<T> run(
+ final UnaryOperator<TimelineLookup<String, ServerSelector>>
timelineConverter,
+ final boolean specificSegments
+ )
{
final Optional<? extends TimelineLookup<String, ServerSelector>>
maybeTimeline = serverView.getTimeline(
dataSourceAnalysis
@@ -335,7 +341,7 @@ public class CachingClusteredClient implements
QuerySegmentWalker
computeUncoveredIntervals(timeline);
}
- final Set<SegmentServerSelector> segmentServers =
computeSegmentsToQuery(timeline);
+ final Set<SegmentServerSelector> segmentServers =
computeSegmentsToQuery(timeline, specificSegments);
@Nullable
final byte[] queryCacheKey = computeQueryCacheKey();
if (query.getContext().get(QueryResource.HEADER_IF_NONE_MATCH) != null) {
@@ -401,11 +407,16 @@ public class CachingClusteredClient implements
QuerySegmentWalker
}
}
- private Set<SegmentServerSelector>
computeSegmentsToQuery(TimelineLookup<String, ServerSelector> timeline)
+ private Set<SegmentServerSelector> computeSegmentsToQuery(
+ TimelineLookup<String, ServerSelector> timeline,
+ boolean specificSegments
+ )
{
+ final java.util.function.Function<Interval,
List<TimelineObjectHolder<String, ServerSelector>>> lookupFn
+ = specificSegments ? timeline::lookupWithIncompletePartitions :
timeline::lookup;
final List<TimelineObjectHolder<String, ServerSelector>> serversLookup =
toolChest.filterSegments(
query,
- intervals.stream().flatMap(i ->
timeline.lookup(i).stream()).collect(Collectors.toList())
+ intervals.stream().flatMap(i ->
lookupFn.apply(i).stream()).collect(Collectors.toList())
);
final Set<SegmentServerSelector> segments = new LinkedHashSet<>();
diff --git a/server/src/test/java/org/apache/druid/client/TestHttpClient.java
b/server/src/test/java/org/apache/druid/client/TestHttpClient.java
index 7828b64..a899cb0 100644
--- a/server/src/test/java/org/apache/druid/client/TestHttpClient.java
+++ b/server/src/test/java/org/apache/druid/client/TestHttpClient.java
@@ -171,7 +171,7 @@ public class TestHttpClient implements HttpClient
private QueryRunner getQueryRunner()
{
if (isSegmentDropped) {
- return new ReportTimelineMissingSegmentQueryRunner(
+ return new ReportTimelineMissingSegmentQueryRunner<>(
new SegmentDescriptor(segment.getInterval(), segment.getVersion(),
segment.getId().getPartitionNum())
);
} else {
diff --git
a/server/src/test/java/org/apache/druid/query/RetryQueryRunnerTest.java
b/server/src/test/java/org/apache/druid/query/RetryQueryRunnerTest.java
index 7ea13f2..3cea53b 100644
--- a/server/src/test/java/org/apache/druid/query/RetryQueryRunnerTest.java
+++ b/server/src/test/java/org/apache/druid/query/RetryQueryRunnerTest.java
@@ -20,6 +20,7 @@
package org.apache.druid.query;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.client.CachingClusteredClient;
@@ -35,7 +36,9 @@ import org.apache.druid.client.cache.MapCache;
import org.apache.druid.guice.http.DruidHttpClientConfig;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.NonnullPair;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.io.Closer;
@@ -74,7 +77,7 @@ public class RetryQueryRunnerTest
{
private static final Closer CLOSER = Closer.create();
private static final String DATASOURCE = "datasource";
- private static final GeneratorSchemaInfo SCHEMA_INFO =
GeneratorBasicSchemas.SCHEMA_MAP.get("basic");
+ private static final GeneratorSchemaInfo BASE_SCHEMA_INFO =
GeneratorBasicSchemas.SCHEMA_MAP.get("basic");
private static final boolean USE_PARALLEL_MERGE_POOL_CONFIGURED = false;
@Rule
@@ -148,7 +151,7 @@ public class RetryQueryRunnerTest
public void testNoRetry()
{
prepareCluster(10);
- final Query<Result<TimeseriesResultValue>> query =
timeseriesQuery(SCHEMA_INFO.getDataInterval());
+ final Query<Result<TimeseriesResultValue>> query =
timeseriesQuery(BASE_SCHEMA_INFO.getDataInterval());
final RetryQueryRunner<Result<TimeseriesResultValue>> queryRunner =
createQueryRunner(
newRetryQueryRunnerConfig(1, false),
query,
@@ -165,7 +168,7 @@ public class RetryQueryRunnerTest
public void testRetryForMovedSegment()
{
prepareCluster(10);
- final Query<Result<TimeseriesResultValue>> query =
timeseriesQuery(SCHEMA_INFO.getDataInterval());
+ final Query<Result<TimeseriesResultValue>> query =
timeseriesQuery(BASE_SCHEMA_INFO.getDataInterval());
final RetryQueryRunner<Result<TimeseriesResultValue>> queryRunner =
createQueryRunner(
newRetryQueryRunnerConfig(1, true),
query,
@@ -189,7 +192,7 @@ public class RetryQueryRunnerTest
public void testRetryUntilWeGetFullResult()
{
prepareCluster(10);
- final Query<Result<TimeseriesResultValue>> query =
timeseriesQuery(SCHEMA_INFO.getDataInterval());
+ final Query<Result<TimeseriesResultValue>> query =
timeseriesQuery(BASE_SCHEMA_INFO.getDataInterval());
final RetryQueryRunner<Result<TimeseriesResultValue>> queryRunner =
createQueryRunner(
newRetryQueryRunnerConfig(100, false), // retry up to 100
query,
@@ -209,7 +212,7 @@ public class RetryQueryRunnerTest
public void testFailWithPartialResultsAfterRetry()
{
prepareCluster(10);
- final Query<Result<TimeseriesResultValue>> query =
timeseriesQuery(SCHEMA_INFO.getDataInterval());
+ final Query<Result<TimeseriesResultValue>> query =
timeseriesQuery(BASE_SCHEMA_INFO.getDataInterval());
final RetryQueryRunner<Result<TimeseriesResultValue>> queryRunner =
createQueryRunner(
newRetryQueryRunnerConfig(1, false),
query,
@@ -229,12 +232,26 @@ public class RetryQueryRunnerTest
private void prepareCluster(int numServers)
{
+ Preconditions.checkArgument(numServers < 25, "Cannot be larger than 24");
for (int i = 0; i < numServers; i++) {
- final DataSegment segment = newSegment(SCHEMA_INFO.getDataInterval(), i);
+ final int partitionId = i % 2;
+ final int intervalIndex = i / 2;
+ final Interval interval = Intervals.of("2000-01-01T%02d/PT1H",
intervalIndex);
+ final DataSegment segment = newSegment(interval, partitionId, 2);
addServer(
SimpleServerView.createServer(i + 1),
segment,
- segmentGenerator.generate(segment, SCHEMA_INFO, Granularities.NONE,
10)
+ segmentGenerator.generate(
+ segment,
+ new GeneratorSchemaInfo(
+ BASE_SCHEMA_INFO.getColumnSchemas(),
+ BASE_SCHEMA_INFO.getAggs(),
+ interval,
+ BASE_SCHEMA_INFO.isWithRollup()
+ ),
+ Granularities.NONE,
+ 10
+ )
);
}
}
@@ -315,7 +332,7 @@ public class RetryQueryRunnerTest
return Druids.newTimeseriesQueryBuilder()
.dataSource(DATASOURCE)
.intervals(ImmutableList.of(interval))
- .granularity(Granularities.DAY)
+ .granularity(Granularities.HOUR)
.aggregators(new CountAggregatorFactory("rows"))
.context(
ImmutableMap.of(
@@ -338,20 +355,26 @@ public class RetryQueryRunnerTest
{
return IntStream
.range(0, expectedNumResultRows)
- .mapToObj(i -> new Result<>(DateTimes.of("2000-01-01"), new
TimeseriesResultValue(ImmutableMap.of("rows", 10))))
+ .mapToObj(
+ i -> new Result<>(
+ DateTimes.of(StringUtils.format("2000-01-01T%02d", i / 2)),
+ new TimeseriesResultValue(ImmutableMap.of("rows", 10))
+ )
+ )
.collect(Collectors.toList());
}
private static DataSegment newSegment(
Interval interval,
- int partitionId
+ int partitionId,
+ int numCorePartitions
)
{
return DataSegment.builder()
.dataSource(DATASOURCE)
.interval(interval)
.version("1")
- .shardSpec(new NumberedShardSpec(partitionId, 0))
+ .shardSpec(new NumberedShardSpec(partitionId,
numCorePartitions))
.size(10)
.build();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]