This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new 7baa330 Introduce published segment cache in broker (#6901)
7baa330 is described below
commit 7baa33049cde3c3751037f014f468db4beb41a4c
Author: Surekha <[email protected]>
AuthorDate: Sat Feb 2 22:27:13 2019 -0800
Introduce published segment cache in broker (#6901)
* Add published segment cache in broker
* Change the DataSegment interner so it's not based on DataSEgment's equals
only and size is preserved if set
* Added a trueEquals to DataSegment class
* Use separate interner for realtime and historical segments
* Remove trueEquals as it's not used anymore, change log message
* PR comments
* PR comments
* Fix tests
* PR comments
* Few more modification to
* change the coordinator api
* removeall segments at once from MetadataSegmentView in order to serve a
more consistent view of published segments
* Change the poll behaviour to avoid multiple poll execution at same time
* minor changes
* PR comments
* PR comments
* Make the segment cache in broker off by default
* Added a config to PlannerConfig
* Moved MetadataSegmentView to sql module
* Add doc for new planner config
* Update documentation
* PR comments
* some more changes
* PR comments
* fix test
* remove unintentional change, whether to synchronize on lifecycleLock is
still in discussion in PR
* minor changes
* some changes to initialization
* use pollPeriodInMS
* Add boolean cachePopulated to check if first poll succeeds
* Remove poll from start()
* take the log message out of condition in stop()
---
.../apache/druid/benchmark/query/SqlBenchmark.java | 2 +-
docs/content/querying/sql.md | 2 +
.../bloom/sql/BloomFilterSqlAggregatorTest.java | 2 +-
.../histogram/sql/QuantileSqlAggregatorTest.java | 2 +-
.../apache/druid/client/DataSegmentInterner.java | 49 ++++
.../druid/client/selector/ServerSelector.java | 3 +-
.../apache/druid/server/http/MetadataResource.java | 15 +-
.../druid/client/CachingClusteredClientTest.java | 5 +-
.../druid/sql/calcite/planner/PlannerConfig.java | 23 ++
.../sql/calcite/schema/MetadataSegmentView.java | 255 +++++++++++++++++++++
.../druid/sql/calcite/schema/SystemSchema.java | 104 +++------
.../druid/sql/avatica/DruidAvaticaHandlerTest.java | 4 +-
.../druid/sql/avatica/DruidStatementTest.java | 2 +-
.../druid/sql/calcite/BaseCalciteQueryTest.java | 2 +-
.../druid/sql/calcite/http/SqlResourceTest.java | 2 +-
.../druid/sql/calcite/schema/SystemSchemaTest.java | 186 ++++++---------
.../druid/sql/calcite/util/CalciteTests.java | 13 +-
17 files changed, 461 insertions(+), 210 deletions(-)
diff --git
a/benchmarks/src/main/java/org/apache/druid/benchmark/query/SqlBenchmark.java
b/benchmarks/src/main/java/org/apache/druid/benchmark/query/SqlBenchmark.java
index bb19188..1f51588 100644
---
a/benchmarks/src/main/java/org/apache/druid/benchmark/query/SqlBenchmark.java
+++
b/benchmarks/src/main/java/org/apache/druid/benchmark/query/SqlBenchmark.java
@@ -114,7 +114,7 @@ public class SqlBenchmark
final QueryRunnerFactoryConglomerate conglomerate =
conglomerateCloserPair.lhs;
final PlannerConfig plannerConfig = new PlannerConfig();
final DruidSchema druidSchema =
CalciteTests.createMockSchema(conglomerate, walker, plannerConfig);
- final SystemSchema systemSchema =
CalciteTests.createMockSystemSchema(druidSchema, walker);
+ final SystemSchema systemSchema =
CalciteTests.createMockSystemSchema(druidSchema, walker, plannerConfig);
this.walker = new
SpecificSegmentsQuerySegmentWalker(conglomerate).add(dataSegment, index);
final PlannerFactory plannerFactory = new PlannerFactory(
druidSchema,
diff --git a/docs/content/querying/sql.md b/docs/content/querying/sql.md
index 358c1ec..9a3f03e 100644
--- a/docs/content/querying/sql.md
+++ b/docs/content/querying/sql.md
@@ -698,6 +698,8 @@ The Druid SQL server is configured through the following
properties on the Broke
|`druid.sql.planner.useFallback`|Whether to evaluate operations on the Broker
when they cannot be expressed as Druid queries. This option is not recommended
for production since it can generate unscalable query plans. If false, SQL
queries that cannot be translated to Druid queries will fail.|false|
|`druid.sql.planner.requireTimeCondition`|Whether to require SQL to have
filter conditions on __time column so that all generated native queries will
have user specified intervals. If true, all queries wihout filter condition on
__time column will fail|false|
|`druid.sql.planner.sqlTimeZone`|Sets the default time zone for the server,
which will affect how time functions and timestamp literals behave. Should be a
time zone name like "America/Los_Angeles" or offset like "-08:00".|UTC|
+|`druid.sql.planner.metadataSegmentCacheEnable`|Whether to keep a cache of
published segments in broker. If true, broker polls coordinator in background
to get segments from metadata store and maintains a local cache. If false,
coordinator's REST api will be invoked when broker needs published segments
info.|false|
+|`druid.sql.planner.metadataSegmentPollPeriod`|How often to poll coordinator
for published segments list if `druid.sql.planner.metadataSegmentCacheEnable`
is set to true. Poll period is in milliseconds. |60000|
## SQL Metrics
diff --git
a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregatorTest.java
b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregatorTest.java
index 4641e27..6b218bb 100644
---
a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregatorTest.java
+++
b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregatorTest.java
@@ -192,7 +192,7 @@ public class BloomFilterSqlAggregatorTest
final PlannerConfig plannerConfig = new PlannerConfig();
final DruidSchema druidSchema =
CalciteTests.createMockSchema(conglomerate, walker, plannerConfig);
- final SystemSchema systemSchema =
CalciteTests.createMockSystemSchema(druidSchema, walker);
+ final SystemSchema systemSchema =
CalciteTests.createMockSystemSchema(druidSchema, walker, plannerConfig);
final DruidOperatorTable operatorTable = new DruidOperatorTable(
ImmutableSet.of(new BloomFilterSqlAggregator()),
ImmutableSet.of()
diff --git
a/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java
b/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java
index 69337b5..7aa4e67 100644
---
a/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java
+++
b/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java
@@ -157,7 +157,7 @@ public class QuantileSqlAggregatorTest extends
CalciteTestBase
final PlannerConfig plannerConfig = new PlannerConfig();
final DruidSchema druidSchema =
CalciteTests.createMockSchema(conglomerate, walker, plannerConfig);
- final SystemSchema systemSchema =
CalciteTests.createMockSystemSchema(druidSchema, walker);
+ final SystemSchema systemSchema =
CalciteTests.createMockSystemSchema(druidSchema, walker, plannerConfig);
final DruidOperatorTable operatorTable = new DruidOperatorTable(
ImmutableSet.of(new QuantileSqlAggregator()),
ImmutableSet.of()
diff --git
a/server/src/main/java/org/apache/druid/client/DataSegmentInterner.java
b/server/src/main/java/org/apache/druid/client/DataSegmentInterner.java
new file mode 100644
index 0000000..11d104d
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/client/DataSegmentInterner.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client;
+
+import com.google.common.collect.Interner;
+import com.google.common.collect.Interners;
+import org.apache.druid.timeline.DataSegment;
+
+/**
+ * Interns the DataSegment object in order to share the reference for same
DataSegment.
+ * It uses two separate interners for realtime and historical segments to
prevent
+ * overwriting the size of a segment which was served by a historical and
later served
+ * by another realtime server, since realtime server always publishes with
size 0.
+ */
+public class DataSegmentInterner
+{
+ private static final Interner<DataSegment> REALTIME_INTERNER =
Interners.newWeakInterner();
+ private static final Interner<DataSegment> HISTORICAL_INTERNER =
Interners.newWeakInterner();
+
+ private DataSegmentInterner()
+ {
+ //No instantiation
+ }
+
+ public static DataSegment intern(DataSegment segment)
+ {
+ // A segment learns it's size and dimensions when it moves from a relatime
to historical server
+ // for that reason, we are using it's size as the indicator to decide
whether to use REALTIME or
+ // HISTORICAL interner.
+ return segment.getSize() > 0 ? HISTORICAL_INTERNER.intern(segment) :
REALTIME_INTERNER.intern(segment);
+ }
+}
diff --git
a/server/src/main/java/org/apache/druid/client/selector/ServerSelector.java
b/server/src/main/java/org/apache/druid/client/selector/ServerSelector.java
index de6b58b..a485dba 100644
--- a/server/src/main/java/org/apache/druid/client/selector/ServerSelector.java
+++ b/server/src/main/java/org/apache/druid/client/selector/ServerSelector.java
@@ -20,6 +20,7 @@
package org.apache.druid.client.selector;
import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap;
+import org.apache.druid.client.DataSegmentInterner;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.timeline.DataSegment;
@@ -50,7 +51,7 @@ public class ServerSelector implements
DiscoverySelector<QueryableDruidServer>
TierSelectorStrategy strategy
)
{
- this.segment = new AtomicReference<>(segment);
+ this.segment = new AtomicReference<>(DataSegmentInterner.intern(segment));
this.strategy = strategy;
this.historicalServers = new
Int2ObjectRBTreeMap<>(strategy.getComparator());
this.realtimeServers = new Int2ObjectRBTreeMap<>(strategy.getComparator());
diff --git
a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java
b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java
index d1e83a9..c7e2702 100644
--- a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java
@@ -57,6 +57,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
@@ -148,14 +149,22 @@ public class MetadataResource
@GET
@Path("/segments")
@Produces(MediaType.APPLICATION_JSON)
- public Response getDatabaseSegments(@Context final HttpServletRequest req)
+ public Response getDatabaseSegments(
+ @Context final HttpServletRequest req,
+ @QueryParam("datasources") final Set<String> datasources
+ )
{
- final Collection<ImmutableDruidDataSource> druidDataSources =
metadataSegmentManager.getDataSources();
+ Collection<ImmutableDruidDataSource> druidDataSources =
metadataSegmentManager.getDataSources();
+ if (datasources != null && !datasources.isEmpty()) {
+ druidDataSources = druidDataSources.stream()
+ .filter(src ->
datasources.contains(src.getName()))
+ .collect(Collectors.toSet());
+ }
final Stream<DataSegment> metadataSegments = druidDataSources
.stream()
.flatMap(t -> t.getSegments().stream());
- Function<DataSegment, Iterable<ResourceAction>> raGenerator = segment ->
Collections.singletonList(
+ final Function<DataSegment, Iterable<ResourceAction>> raGenerator =
segment -> Collections.singletonList(
AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSource()));
final Iterable<DataSegment> authorizedSegments =
diff --git
a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
index 2eff321..4f043ee 100644
---
a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
+++
b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
@@ -2213,13 +2213,13 @@ public class CachingClusteredClientTest
expectedResults.get(k).get(j)
);
serverExpectations.get(lastServer).addExpectation(expectation);
-
+ EasyMock.expect(mockSegment.getSize()).andReturn(0L).anyTimes();
+ EasyMock.replay(mockSegment);
ServerSelector selector = new ServerSelector(
expectation.getSegment(),
new HighestPriorityTierSelectorStrategy(new
RandomServerSelectorStrategy())
);
selector.addServerAndUpdateSegment(new
QueryableDruidServer(lastServer, null), selector.getSegment());
-
final ShardSpec shardSpec;
if (numChunks == 1) {
shardSpec = new SingleDimensionShardSpec("dimAll", null, null, 0);
@@ -2234,6 +2234,7 @@ public class CachingClusteredClientTest
}
shardSpec = new SingleDimensionShardSpec("dim" + k, start, end, j);
}
+ EasyMock.reset(mockSegment);
EasyMock.expect(mockSegment.getShardSpec())
.andReturn(shardSpec)
.anyTimes();
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java
index 766bf92..acc4d08 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java
@@ -67,6 +67,21 @@ public class PlannerConfig
private DateTimeZone sqlTimeZone = DateTimeZone.UTC;
@JsonProperty
+ private boolean metadataSegmentCacheEnable = false;
+
+ @JsonProperty
+ private long metadataSegmentPollPeriod = 60000;
+
+ public long getMetadataSegmentPollPeriod()
+ {
+ return metadataSegmentPollPeriod;
+ }
+
+ public boolean isMetadataSegmentCacheEnable()
+ {
+ return metadataSegmentCacheEnable;
+ }
+
private boolean serializeComplexValues = true;
public Period getMetadataRefreshPeriod()
@@ -159,6 +174,8 @@ public class PlannerConfig
newConfig.requireTimeCondition = isRequireTimeCondition();
newConfig.sqlTimeZone = getSqlTimeZone();
newConfig.awaitInitializationOnStart = isAwaitInitializationOnStart();
+ newConfig.metadataSegmentCacheEnable = isMetadataSegmentCacheEnable();
+ newConfig.metadataSegmentPollPeriod = getMetadataSegmentPollPeriod();
newConfig.serializeComplexValues = shouldSerializeComplexValues();
return newConfig;
}
@@ -200,6 +217,8 @@ public class PlannerConfig
useFallback == that.useFallback &&
requireTimeCondition == that.requireTimeCondition &&
awaitInitializationOnStart == that.awaitInitializationOnStart &&
+ metadataSegmentCacheEnable == that.metadataSegmentCacheEnable &&
+ metadataSegmentPollPeriod == that.metadataSegmentPollPeriod &&
serializeComplexValues == that.serializeComplexValues &&
Objects.equals(metadataRefreshPeriod, that.metadataRefreshPeriod) &&
Objects.equals(sqlTimeZone, that.sqlTimeZone);
@@ -221,6 +240,8 @@ public class PlannerConfig
requireTimeCondition,
awaitInitializationOnStart,
sqlTimeZone,
+ metadataSegmentCacheEnable,
+ metadataSegmentPollPeriod,
serializeComplexValues
);
}
@@ -239,6 +260,8 @@ public class PlannerConfig
", useFallback=" + useFallback +
", requireTimeCondition=" + requireTimeCondition +
", awaitInitializationOnStart=" + awaitInitializationOnStart +
+ ", metadataSegmentCacheEnable=" + metadataSegmentCacheEnable +
+ ", metadataSegmentPollPeriod=" + metadataSegmentPollPeriod +
", sqlTimeZone=" + sqlTimeZone +
", serializeComplexValues=" + serializeComplexValues +
'}';
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java
new file mode 100644
index 0000000..50fe313
--- /dev/null
+++
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.schema;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.inject.Inject;
+import org.apache.druid.client.BrokerSegmentWatcherConfig;
+import org.apache.druid.client.DataSegmentInterner;
+import org.apache.druid.client.JsonParserIterator;
+import org.apache.druid.client.coordinator.Coordinator;
+import org.apache.druid.concurrent.LifecycleLock;
+import org.apache.druid.discovery.DruidLeaderClient;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.http.client.Request;
+import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
+import org.apache.druid.sql.calcite.planner.PlannerConfig;
+import org.apache.druid.timeline.DataSegment;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.joda.time.DateTime;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * This class polls the coordinator in background to keep the latest published
segments.
+ * Provides {@link #getPublishedSegments()} for others to get segments in
metadata store.
+ */
+@ManageLifecycle
+public class MetadataSegmentView
+{
+
+ private static final EmittingLogger log = new
EmittingLogger(MetadataSegmentView.class);
+
+ private final DruidLeaderClient coordinatorDruidLeaderClient;
+ private final ObjectMapper jsonMapper;
+ private final BytesAccumulatingResponseHandler responseHandler;
+ private final BrokerSegmentWatcherConfig segmentWatcherConfig;
+
+ private final boolean isCacheEnabled;
+ @Nullable
+ private final ConcurrentMap<DataSegment, DateTime> publishedSegments;
+ private final ScheduledExecutorService scheduledExec;
+ private final long pollPeriodInMS;
+ private final LifecycleLock lifecycleLock = new LifecycleLock();
+ private final AtomicBoolean cachePopulated = new AtomicBoolean(false);
+
+ @Inject
+ public MetadataSegmentView(
+ final @Coordinator DruidLeaderClient druidLeaderClient,
+ final ObjectMapper jsonMapper,
+ final BytesAccumulatingResponseHandler responseHandler,
+ final BrokerSegmentWatcherConfig segmentWatcherConfig,
+ final PlannerConfig plannerConfig
+ )
+ {
+ Preconditions.checkNotNull(plannerConfig, "plannerConfig");
+ this.coordinatorDruidLeaderClient = druidLeaderClient;
+ this.jsonMapper = jsonMapper;
+ this.responseHandler = responseHandler;
+ this.segmentWatcherConfig = segmentWatcherConfig;
+ this.isCacheEnabled = plannerConfig.isMetadataSegmentCacheEnable();
+ this.pollPeriodInMS = plannerConfig.getMetadataSegmentPollPeriod();
+ this.publishedSegments = isCacheEnabled ? new ConcurrentHashMap<>(1000) :
null;
+ this.scheduledExec =
Execs.scheduledSingleThreaded("MetadataSegmentView-Cache--%d");
+ }
+
+ @LifecycleStart
+ public void start()
+ {
+ if (!lifecycleLock.canStart()) {
+ throw new ISE("can't start.");
+ }
+ try {
+ if (isCacheEnabled) {
+ scheduledExec.schedule(new PollTask(), pollPeriodInMS,
TimeUnit.MILLISECONDS);
+ }
+ lifecycleLock.started();
+ log.info("MetadataSegmentView Started.");
+ }
+ finally {
+ lifecycleLock.exitStart();
+ }
+ }
+
+ @LifecycleStop
+ public void stop()
+ {
+ if (!lifecycleLock.canStop()) {
+ throw new ISE("can't stop.");
+ }
+ log.info("MetadataSegmentView is stopping.");
+ if (isCacheEnabled) {
+ scheduledExec.shutdown();
+ }
+ log.info("MetadataSegmentView Stopped.");
+ }
+
+ private void poll()
+ {
+ log.info("polling published segments from coordinator");
+ final JsonParserIterator<DataSegment> metadataSegments =
getMetadataSegments(
+ coordinatorDruidLeaderClient,
+ jsonMapper,
+ responseHandler,
+ segmentWatcherConfig.getWatchedDataSources()
+ );
+
+ final DateTime timestamp = DateTimes.nowUtc();
+ while (metadataSegments.hasNext()) {
+ final DataSegment interned =
DataSegmentInterner.intern(metadataSegments.next());
+ // timestamp is used to filter deleted segments
+ publishedSegments.put(interned, timestamp);
+ }
+ // filter the segments from cache whose timestamp is not equal to latest
timestamp stored,
+ // since the presence of a segment with an earlier timestamp indicates that
+ // "that" segment is not returned by coordinator in latest poll, so it's
+ // likely deleted and therefore we remove it from publishedSegments
+ // Since segments are not atomically replaced because it can cause high
+ // memory footprint due to large number of published segments, so
+ // we are incrementally removing deleted segments from the map
+ // This means publishedSegments will be eventually consistent with
+ // the segments in coordinator
+ publishedSegments.entrySet().removeIf(e -> e.getValue() != timestamp);
+ cachePopulated.set(true);
+ }
+
+ public Iterator<DataSegment> getPublishedSegments()
+ {
+ if (isCacheEnabled) {
+ Preconditions.checkState(
+ lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS) &&
cachePopulated.get(),
+ "hold on, still syncing published segments"
+ );
+ return publishedSegments.keySet().iterator();
+ } else {
+ return getMetadataSegments(
+ coordinatorDruidLeaderClient,
+ jsonMapper,
+ responseHandler,
+ segmentWatcherConfig.getWatchedDataSources()
+ );
+ }
+ }
+
+ // Note that coordinator must be up to get segments
+ private JsonParserIterator<DataSegment> getMetadataSegments(
+ DruidLeaderClient coordinatorClient,
+ ObjectMapper jsonMapper,
+ BytesAccumulatingResponseHandler responseHandler,
+ Set<String> watchedDataSources
+ )
+ {
+ String query = "/druid/coordinator/v1/metadata/segments";
+ if (watchedDataSources != null && !watchedDataSources.isEmpty()) {
+ log.debug(
+ "filtering datasources in published segments based on broker's
watchedDataSources[%s]", watchedDataSources);
+ final StringBuilder sb = new StringBuilder();
+ for (String ds : watchedDataSources) {
+ sb.append("datasources=").append(ds).append("&");
+ }
+ sb.setLength(sb.length() - 1);
+ query = "/druid/coordinator/v1/metadata/segments?" + sb;
+ }
+ Request request;
+ try {
+ request = coordinatorClient.makeRequest(
+ HttpMethod.GET,
+ StringUtils.format(query),
+ false
+ );
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ ListenableFuture<InputStream> future = coordinatorClient.goAsync(
+ request,
+ responseHandler
+ );
+
+ final JavaType typeRef = jsonMapper.getTypeFactory().constructType(new
TypeReference<DataSegment>()
+ {
+ });
+ return new JsonParserIterator<>(
+ typeRef,
+ future,
+ request.getUrl().toString(),
+ null,
+ request.getUrl().getHost(),
+ jsonMapper,
+ responseHandler
+ );
+ }
+
+ private class PollTask implements Runnable
+ {
+ @Override
+ public void run()
+ {
+ long delayMS = pollPeriodInMS;
+ try {
+ final long pollStartTime = System.nanoTime();
+ poll();
+ final long pollEndTime = System.nanoTime();
+ final long pollTimeNS = pollEndTime - pollStartTime;
+ final long pollTimeMS = TimeUnit.NANOSECONDS.toMillis(pollTimeNS);
+ delayMS = Math.max(pollPeriodInMS - pollTimeMS, 0);
+ }
+ catch (Exception e) {
+ log.makeAlert(e, "Problem polling Coordinator.").emit();
+ }
+ finally {
+ if (!Thread.currentThread().isInterrupted()) {
+ scheduledExec.schedule(new PollTask(), delayMS,
TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+ }
+
+}
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
index 772c628..d0599f8 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
@@ -149,6 +149,7 @@ public class SystemSchema extends AbstractSchema
@Inject
public SystemSchema(
final DruidSchema druidSchema,
+ final MetadataSegmentView metadataView,
final TimelineServerView serverView,
final AuthorizerMapper authorizerMapper,
final @Coordinator DruidLeaderClient coordinatorDruidLeaderClient,
@@ -158,11 +159,10 @@ public class SystemSchema extends AbstractSchema
{
Preconditions.checkNotNull(serverView, "serverView");
BytesAccumulatingResponseHandler responseHandler = new
BytesAccumulatingResponseHandler();
- SegmentsTable segmentsTable = new SegmentsTable(
+ final SegmentsTable segmentsTable = new SegmentsTable(
druidSchema,
- coordinatorDruidLeaderClient,
+ metadataView,
jsonMapper,
- responseHandler,
authorizerMapper
);
this.tableMap = ImmutableMap.of(
@@ -182,23 +182,20 @@ public class SystemSchema extends AbstractSchema
static class SegmentsTable extends AbstractTable implements ScannableTable
{
private final DruidSchema druidSchema;
- private final DruidLeaderClient druidLeaderClient;
private final ObjectMapper jsonMapper;
- private final BytesAccumulatingResponseHandler responseHandler;
private final AuthorizerMapper authorizerMapper;
+ private final MetadataSegmentView metadataView;
public SegmentsTable(
DruidSchema druidSchemna,
- DruidLeaderClient druidLeaderClient,
+ MetadataSegmentView metadataView,
ObjectMapper jsonMapper,
- BytesAccumulatingResponseHandler responseHandler,
AuthorizerMapper authorizerMapper
)
{
this.druidSchema = druidSchemna;
- this.druidLeaderClient = druidLeaderClient;
+ this.metadataView = metadataView;
this.jsonMapper = jsonMapper;
- this.responseHandler = responseHandler;
this.authorizerMapper = authorizerMapper;
}
@@ -231,12 +228,8 @@ public class SystemSchema extends AbstractSchema
partialSegmentDataMap.put(h.getSegmentId(), partialSegmentData);
}
- //get published segments from coordinator
- final JsonParserIterator<DataSegment> metadataSegments =
getMetadataSegments(
- druidLeaderClient,
- jsonMapper,
- responseHandler
- );
+ //get published segments from metadata segment cache (if enabled in sql
planner config), else directly from coordinator
+ final Iterator<DataSegment> metadataSegments =
metadataView.getPublishedSegments();
final Set<SegmentId> segmentsAlreadySeen = new HashSet<>();
@@ -245,7 +238,7 @@ public class SystemSchema extends AbstractSchema
metadataSegments,
root
))
- .transform((DataSegment val) -> {
+ .transform(val -> {
try {
segmentsAlreadySeen.add(val.getId());
final PartialSegmentData partialSegmentData =
partialSegmentDataMap.get(val.getId());
@@ -318,6 +311,26 @@ public class SystemSchema extends AbstractSchema
}
+ private Iterator<DataSegment> getAuthorizedPublishedSegments(
+ Iterator<DataSegment> it,
+ DataContext root
+ )
+ {
+ final AuthenticationResult authenticationResult =
+ (AuthenticationResult)
root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT);
+
+ Function<DataSegment, Iterable<ResourceAction>> raGenerator = segment ->
Collections.singletonList(
+
AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSource()));
+
+ final Iterable<DataSegment> authorizedSegments =
AuthorizationUtils.filterAuthorizedResources(
+ authenticationResult,
+ () -> it,
+ raGenerator,
+ authorizerMapper
+ );
+ return authorizedSegments.iterator();
+ }
+
private Iterator<Entry<DataSegment, SegmentMetadataHolder>>
getAuthorizedAvailableSegments(
Iterator<Entry<DataSegment, SegmentMetadataHolder>>
availableSegmentEntries,
DataContext root
@@ -340,27 +353,6 @@ public class SystemSchema extends AbstractSchema
return authorizedSegments.iterator();
}
- private CloseableIterator<DataSegment> getAuthorizedPublishedSegments(
- JsonParserIterator<DataSegment> it,
- DataContext root
- )
- {
- final AuthenticationResult authenticationResult =
- (AuthenticationResult)
root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT);
-
- Function<DataSegment, Iterable<ResourceAction>> raGenerator = segment ->
Collections.singletonList(
-
AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSource()));
-
- final Iterable<DataSegment> authorizedSegments =
AuthorizationUtils.filterAuthorizedResources(
- authenticationResult,
- () -> it,
- raGenerator,
- authorizerMapper
- );
-
- return wrap(authorizedSegments.iterator(), it);
- }
-
private static class PartialSegmentData
{
private final long isAvailable;
@@ -404,44 +396,6 @@ public class SystemSchema extends AbstractSchema
}
}
- // Note that coordinator must be up to get segments
- private static JsonParserIterator<DataSegment> getMetadataSegments(
- DruidLeaderClient coordinatorClient,
- ObjectMapper jsonMapper,
- BytesAccumulatingResponseHandler responseHandler
- )
- {
-
- Request request;
- try {
- request = coordinatorClient.makeRequest(
- HttpMethod.GET,
- StringUtils.format("/druid/coordinator/v1/metadata/segments"),
- false
- );
- }
- catch (IOException e) {
- throw new RuntimeException(e);
- }
- ListenableFuture<InputStream> future = coordinatorClient.goAsync(
- request,
- responseHandler
- );
-
- final JavaType typeRef = jsonMapper.getTypeFactory().constructType(new
TypeReference<DataSegment>()
- {
- });
- return new JsonParserIterator<>(
- typeRef,
- future,
- request.getUrl().toString(),
- null,
- request.getUrl().getHost(),
- jsonMapper,
- responseHandler
- );
- }
-
static class ServersTable extends AbstractTable implements ScannableTable
{
private final TimelineServerView serverView;
diff --git
a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
index c1c43b5..4f25a66 100644
---
a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
+++
b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
@@ -159,7 +159,7 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
walker = CalciteTests.createMockWalker(conglomerate,
temporaryFolder.newFolder());
final PlannerConfig plannerConfig = new PlannerConfig();
final DruidSchema druidSchema =
CalciteTests.createMockSchema(conglomerate, walker, plannerConfig);
- final SystemSchema systemSchema =
CalciteTests.createMockSystemSchema(druidSchema, walker);
+ final SystemSchema systemSchema =
CalciteTests.createMockSystemSchema(druidSchema, walker, plannerConfig);
final DruidOperatorTable operatorTable =
CalciteTests.createOperatorTable();
final ExprMacroTable macroTable = CalciteTests.createExprMacroTable();
@@ -790,7 +790,7 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
final PlannerConfig plannerConfig = new PlannerConfig();
final DruidSchema druidSchema =
CalciteTests.createMockSchema(conglomerate, walker, plannerConfig);
- final SystemSchema systemSchema =
CalciteTests.createMockSystemSchema(druidSchema, walker);
+ final SystemSchema systemSchema =
CalciteTests.createMockSystemSchema(druidSchema, walker, plannerConfig);
final DruidOperatorTable operatorTable =
CalciteTests.createOperatorTable();
final ExprMacroTable macroTable = CalciteTests.createExprMacroTable();
final List<Meta.Frame> frames = new ArrayList<>();
diff --git
a/sql/src/test/java/org/apache/druid/sql/avatica/DruidStatementTest.java
b/sql/src/test/java/org/apache/druid/sql/avatica/DruidStatementTest.java
index 7438612..9cc2ecf 100644
--- a/sql/src/test/java/org/apache/druid/sql/avatica/DruidStatementTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/avatica/DruidStatementTest.java
@@ -88,7 +88,7 @@ public class DruidStatementTest extends CalciteTestBase
walker = CalciteTests.createMockWalker(conglomerate,
temporaryFolder.newFolder());
final PlannerConfig plannerConfig = new PlannerConfig();
final DruidSchema druidSchema =
CalciteTests.createMockSchema(conglomerate, walker, plannerConfig);
- final SystemSchema systemSchema =
CalciteTests.createMockSystemSchema(druidSchema, walker);
+ final SystemSchema systemSchema =
CalciteTests.createMockSystemSchema(druidSchema, walker, plannerConfig);
final DruidOperatorTable operatorTable =
CalciteTests.createOperatorTable();
final ExprMacroTable macroTable = CalciteTests.createExprMacroTable();
final PlannerFactory plannerFactory = new PlannerFactory(
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
index 6b47b42..18c1ac4 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
@@ -547,7 +547,7 @@ public class BaseCalciteQueryTest extends CalciteTestBase
{
final InProcessViewManager viewManager = new
InProcessViewManager(CalciteTests.TEST_AUTHENTICATOR_ESCALATOR);
final DruidSchema druidSchema =
CalciteTests.createMockSchema(conglomerate, walker, plannerConfig, viewManager);
- final SystemSchema systemSchema =
CalciteTests.createMockSystemSchema(druidSchema, walker);
+ final SystemSchema systemSchema =
CalciteTests.createMockSystemSchema(druidSchema, walker, plannerConfig);
final PlannerFactory plannerFactory = new PlannerFactory(
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/http/SqlResourceTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/http/SqlResourceTest.java
index 14b2268..d525fff 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/http/SqlResourceTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/http/SqlResourceTest.java
@@ -121,7 +121,7 @@ public class SqlResourceTest extends CalciteTestBase
}
};
final DruidSchema druidSchema =
CalciteTests.createMockSchema(conglomerate, walker, plannerConfig);
- final SystemSchema systemSchema =
CalciteTests.createMockSystemSchema(druidSchema, walker);
+ final SystemSchema systemSchema =
CalciteTests.createMockSystemSchema(druidSchema, walker, plannerConfig);
final DruidOperatorTable operatorTable =
CalciteTests.createOperatorTable();
final ExprMacroTable macroTable = CalciteTests.createExprMacroTable();
req = EasyMock.createStrictMock(HttpServletRequest.class);
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
index 407ff69..7d8cdaa 100644
---
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
+++
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
@@ -98,6 +98,9 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
public class SystemSchemaTest extends CalciteTestBase
{
@@ -127,6 +130,7 @@ public class SystemSchemaTest extends CalciteTestBase
private AuthorizerMapper authMapper;
private static QueryRunnerFactoryConglomerate conglomerate;
private static Closer resourceCloser;
+ private MetadataSegmentView metadataView;
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@@ -215,8 +219,10 @@ public class SystemSchemaTest extends CalciteTestBase
);
druidSchema.start();
druidSchema.awaitInitialization();
+ metadataView = EasyMock.createMock(MetadataSegmentView.class);
schema = new SystemSchema(
druidSchema,
+ metadataView,
serverView,
EasyMock.createStrictMock(AuthorizerMapper.class),
client,
@@ -225,6 +231,44 @@ public class SystemSchemaTest extends CalciteTestBase
);
}
+
+ private final DataSegment publishedSegment1 = new DataSegment(
+ "wikipedia1",
+ Intervals.of("2007/2008"),
+ "version1",
+ null,
+ ImmutableList.of("dim1", "dim2"),
+ ImmutableList.of("met1", "met2"),
+ null,
+ 1,
+ 53000L,
+ DataSegment.PruneLoadSpecHolder.DEFAULT
+ );
+ private final DataSegment publishedSegment2 = new DataSegment(
+ "wikipedia2",
+ Intervals.of("2008/2009"),
+ "version2",
+ null,
+ ImmutableList.of("dim1", "dim2"),
+ ImmutableList.of("met1", "met2"),
+ null,
+ 1,
+ 83000L,
+ DataSegment.PruneLoadSpecHolder.DEFAULT
+ );
+ private final DataSegment publishedSegment3 = new DataSegment(
+ "wikipedia3",
+ Intervals.of("2009/2010"),
+ "version3",
+ null,
+ ImmutableList.of("dim1", "dim2"),
+ ImmutableList.of("met1", "met2"),
+ null,
+ 1,
+ 47000L,
+ DataSegment.PruneLoadSpecHolder.DEFAULT
+ );
+
private final DataSegment segment1 = new DataSegment(
"test1",
Intervals.of("2010/2011"),
@@ -263,7 +307,7 @@ public class SystemSchemaTest extends CalciteTestBase
);
private final DataSegment segment4 = new DataSegment(
"test4",
- Intervals.of("2017/2018"),
+ Intervals.of("2014/2015"),
"version4",
null,
ImmutableList.of("dim1", "dim2"),
@@ -275,7 +319,7 @@ public class SystemSchemaTest extends CalciteTestBase
);
private final DataSegment segment5 = new DataSegment(
"test5",
- Intervals.of("2017/2018"),
+ Intervals.of("2015/2016"),
"version5",
null,
ImmutableList.of("dim1", "dim2"),
@@ -340,120 +384,22 @@ public class SystemSchemaTest extends CalciteTestBase
}
@Test
- public void testSegmentsTable() throws Exception
+ public void testSegmentsTable()
{
final SystemSchema.SegmentsTable segmentsTable = EasyMock
.createMockBuilder(SystemSchema.SegmentsTable.class)
- .withConstructor(druidSchema, client, mapper, responseHandler,
authMapper)
+ .withConstructor(druidSchema, metadataView, mapper, authMapper)
.createMock();
EasyMock.replay(segmentsTable);
-
- EasyMock
- .expect(client.makeRequest(HttpMethod.GET,
"/druid/coordinator/v1/metadata/segments", false))
- .andReturn(request)
- .anyTimes();
- SettableFuture<InputStream> future = SettableFuture.create();
- EasyMock.expect(client.goAsync(request,
responseHandler)).andReturn(future).once();
- final int ok = HttpServletResponse.SC_OK;
- EasyMock.expect(responseHandler.getStatus()).andReturn(ok).anyTimes();
-
- EasyMock
- .expect(request.getUrl())
- .andReturn(new
URL("http://test-host:1234/druid/coordinator/v1/metadata/segments"))
- .anyTimes();
-
- AppendableByteArrayInputStream in = new AppendableByteArrayInputStream();
- //segments in metadata store : wikipedia1, wikipedia2, wikipedia3, test1,
test2
- final String json = "[{\n"
- + "\t\"dataSource\": \"wikipedia1\",\n"
- + "\t\"interval\":
\"2018-08-07T23:00:00.000Z/2018-08-08T00:00:00.000Z\",\n"
- + "\t\"version\": \"2018-08-07T23:00:00.059Z\",\n"
- + "\t\"loadSpec\": {\n"
- + "\t\t\"type\": \"local\",\n"
- + "\t\t\"path\":
\"/var/druid/segments/wikipedia-kafka/2018-08-07T23:00:00.000Z_2018-08-08T00:00:00.000Z/2018-08-07T23:00:00.059Z/51/1578eb79-0e44-4b41-a87b-65e40c52be53/index.zip\"\n"
- + "\t},\n"
- + "\t\"dimensions\":
\"isRobot,channel,flags,isUnpatrolled,page,diffUrl,comment,isNew,isMinor,user,namespace,commentLength,deltaBucket,cityName,countryIsoCode,countryName,isAnonymous,regionIsoCode,regionName,added,deleted,delta\",\n"
- + "\t\"metrics\": \"count,user_unique\",\n"
- + "\t\"shardSpec\": {\n"
- + "\t\t\"type\": \"none\",\n"
- + "\t\t\"partitionNum\": 51,\n"
- + "\t\t\"partitions\": 0\n"
- + "\t},\n"
- + "\t\"binaryVersion\": 9,\n"
- + "\t\"size\": 47406,\n"
- + "\t\"identifier\":
\"wikipedia-kafka_2018-08-07T23:00:00.000Z_2018-08-08T00:00:00.000Z_2018-08-07T23:00:00.059Z_51\"\n"
- + "}, {\n"
- + "\t\"dataSource\": \"wikipedia2\",\n"
- + "\t\"interval\":
\"2018-08-07T18:00:00.000Z/2018-08-07T19:00:00.000Z\",\n"
- + "\t\"version\": \"2018-08-07T18:00:00.117Z\",\n"
- + "\t\"loadSpec\": {\n"
- + "\t\t\"type\": \"local\",\n"
- + "\t\t\"path\":
\"/var/druid/segments/wikipedia-kafka/2018-08-07T18:00:00.000Z_2018-08-07T19:00:00.000Z/2018-08-07T18:00:00.117Z/9/a2646827-b782-424c-9eed-e48aa448d2c5/index.zip\"\n"
- + "\t},\n"
- + "\t\"dimensions\":
\"isRobot,channel,flags,isUnpatrolled,page,diffUrl,comment,isNew,isMinor,user,namespace,commentLength,deltaBucket,cityName,countryIsoCode,countryName,isAnonymous,metroCode,regionIsoCode,regionName,added,deleted,delta\",\n"
- + "\t\"metrics\": \"count,user_unique\",\n"
- + "\t\"shardSpec\": {\n"
- + "\t\t\"type\": \"none\",\n"
- + "\t\t\"partitionNum\": 9,\n"
- + "\t\t\"partitions\": 0\n"
- + "\t},\n"
- + "\t\"binaryVersion\": 9,\n"
- + "\t\"size\": 83846,\n"
- + "\t\"identifier\":
\"wikipedia-kafka_2018-08-07T18:00:00.000Z_2018-08-07T19:00:00.000Z_2018-08-07T18:00:00.117Z_9\"\n"
- + "}, {\n"
- + "\t\"dataSource\": \"wikipedia3\",\n"
- + "\t\"interval\":
\"2018-08-07T23:00:00.000Z/2018-08-08T00:00:00.000Z\",\n"
- + "\t\"version\": \"2018-08-07T23:00:00.059Z\",\n"
- + "\t\"loadSpec\": {\n"
- + "\t\t\"type\": \"local\",\n"
- + "\t\t\"path\":
\"/var/druid/segments/wikipedia-kafka/2018-08-07T23:00:00.000Z_2018-08-08T00:00:00.000Z/2018-08-07T23:00:00.059Z/50/87c5457e-c39b-4c03-9df8-e2b20b210dfc/index.zip\"\n"
- + "\t},\n"
- + "\t\"dimensions\":
\"isRobot,channel,flags,isUnpatrolled,page,diffUrl,comment,isNew,isMinor,user,namespace,commentLength,deltaBucket,cityName,countryIsoCode,countryName,isAnonymous,metroCode,regionIsoCode,regionName,added,deleted,delta\",\n"
- + "\t\"metrics\": \"count,user_unique\",\n"
- + "\t\"shardSpec\": {\n"
- + "\t\t\"type\": \"none\",\n"
- + "\t\t\"partitionNum\": 50,\n"
- + "\t\t\"partitions\": 0\n"
- + "\t},\n"
- + "\t\"binaryVersion\": 9,\n"
- + "\t\"size\": 53527,\n"
- + "\t\"identifier\":
\"wikipedia-kafka_2018-08-07T23:00:00.000Z_2018-08-08T00:00:00.000Z_2018-08-07T23:00:00.059Z_50\"\n"
- + "}, {\n"
- + "\t\"dataSource\": \"test1\",\n"
- + "\t\"interval\":
\"2010-01-01T00:00:00.000Z/2011-01-01T00:00:00.000Z\",\n"
- + "\t\"version\": \"version1\",\n"
- + "\t\"loadSpec\": null,\n"
- + "\t\"dimensions\": \"dim1,dim2\",\n"
- + "\t\"metrics\": \"met1,met2\",\n"
- + "\t\"shardSpec\": {\n"
- + "\t\t\"type\": \"none\",\n"
- + "\t\t\"domainDimensions\": []\n"
- + "\t},\n"
- + "\t\"binaryVersion\": 1,\n"
- + "\t\"size\": 100,\n"
- + "\t\"identifier\":
\"test1_2010-01-01T00:00:00.000Z_2011-01-01T00:00:00.000Z_version1\"\n"
- + "}, {\n"
- + "\t\"dataSource\": \"test2\",\n"
- + "\t\"interval\":
\"2011-01-01T00:00:00.000Z/2012-01-01T00:00:00.000Z\",\n"
- + "\t\"version\": \"version2\",\n"
- + "\t\"loadSpec\": null,\n"
- + "\t\"dimensions\": \"dim1,dim2\",\n"
- + "\t\"metrics\": \"met1,met2\",\n"
- + "\t\"shardSpec\": {\n"
- + "\t\t\"type\": \"none\",\n"
- + "\t\t\"domainDimensions\": []\n"
- + "\t},\n"
- + "\t\"binaryVersion\": 1,\n"
- + "\t\"size\": 100,\n"
- + "\t\"identifier\":
\"test2_2011-01-01T00:00:00.000Z_2012-01-01T00:00:00.000Z_version2\"\n"
- + "}]";
- byte[] bytesToWrite = json.getBytes(StandardCharsets.UTF_8);
- in.add(bytesToWrite);
- in.done();
- future.set(in);
-
- EasyMock.replay(client, request, responseHolder, responseHandler);
+ final Set<DataSegment> publishedSegments = Stream.of(publishedSegment1,
+ publishedSegment2,
+ publishedSegment3,
+ segment1,
+
segment2).collect(Collectors.toSet());
+
EasyMock.expect(metadataView.getPublishedSegments()).andReturn(publishedSegments.iterator()).once();
+
+ EasyMock.replay(client, request, responseHolder, responseHandler,
metadataView);
DataContext dataContext = new DataContext()
{
@Override
@@ -531,7 +477,7 @@ public class SystemSchemaTest extends CalciteTestBase
verifyRow(
rows.get(3),
- "test4_2017-01-01T00:00:00.000Z_2018-01-01T00:00:00.000Z_version4",
+ "test4_2014-01-01T00:00:00.000Z_2015-01-01T00:00:00.000Z_version4",
100L,
0L, //partition_num
1L, //num_replicas
@@ -543,7 +489,7 @@ public class SystemSchemaTest extends CalciteTestBase
verifyRow(
rows.get(4),
- "test5_2017-01-01T00:00:00.000Z_2018-01-01T00:00:00.000Z_version5",
+ "test5_2015-01-01T00:00:00.000Z_2016-01-01T00:00:00.000Z_version5",
100L,
0L, //partition_num
1L, //num_replicas
@@ -556,8 +502,8 @@ public class SystemSchemaTest extends CalciteTestBase
// wikipedia segments are published and unavailable, num_replicas is 0
verifyRow(
rows.get(5),
-
"wikipedia1_2018-08-07T23:00:00.000Z_2018-08-08T00:00:00.000Z_2018-08-07T23:00:00.059Z",
- 47406L,
+
"wikipedia1_2007-01-01T00:00:00.000Z_2008-01-01T00:00:00.000Z_version1",
+ 53000L,
0L, //partition_num
0L, //num_replicas
0L, //numRows
@@ -568,8 +514,8 @@ public class SystemSchemaTest extends CalciteTestBase
verifyRow(
rows.get(6),
-
"wikipedia2_2018-08-07T18:00:00.000Z_2018-08-07T19:00:00.000Z_2018-08-07T18:00:00.117Z",
- 83846L,
+
"wikipedia2_2008-01-01T00:00:00.000Z_2009-01-01T00:00:00.000Z_version2",
+ 83000L,
0L, //partition_num
0L, //num_replicas
0L, //numRows
@@ -580,8 +526,8 @@ public class SystemSchemaTest extends CalciteTestBase
verifyRow(
rows.get(7),
-
"wikipedia3_2018-08-07T23:00:00.000Z_2018-08-08T00:00:00.000Z_2018-08-07T23:00:00.059Z",
- 53527L,
+
"wikipedia3_2009-01-01T00:00:00.000Z_2010-01-01T00:00:00.000Z_version3",
+ 47000L,
0L, //partition_num
0L, //num_replicas
0L, //numRows
@@ -736,11 +682,11 @@ public class SystemSchemaTest extends CalciteTestBase
Object[] row3 = rows.get(3);
Assert.assertEquals("server2:1234", row3[0]);
-
Assert.assertEquals("test4_2017-01-01T00:00:00.000Z_2018-01-01T00:00:00.000Z_version4",
row3[1].toString());
+
Assert.assertEquals("test4_2014-01-01T00:00:00.000Z_2015-01-01T00:00:00.000Z_version4",
row3[1].toString());
Object[] row4 = rows.get(4);
Assert.assertEquals("server2:1234", row4[0]);
-
Assert.assertEquals("test5_2017-01-01T00:00:00.000Z_2018-01-01T00:00:00.000Z_version5",
row4[1].toString());
+
Assert.assertEquals("test5_2015-01-01T00:00:00.000Z_2016-01-01T00:00:00.000Z_version5",
row4[1].toString());
// Verify value types.
verifyTypes(rows, SystemSchema.SERVER_SEGMENTS_SIGNATURE);
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
index 359ca25..5beafb7 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
@@ -31,6 +31,7 @@ import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import org.apache.curator.x.discovery.ServiceProvider;
+import org.apache.druid.client.BrokerSegmentWatcherConfig;
import org.apache.druid.collections.CloseableStupidPool;
import org.apache.druid.curator.discovery.ServerDiscoverySelector;
import org.apache.druid.data.input.InputRow;
@@ -104,6 +105,7 @@ import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.QueryLifecycleFactory;
+import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
import org.apache.druid.server.log.NoopRequestLogger;
import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.AllowAllAuthenticator;
@@ -123,6 +125,7 @@ import
org.apache.druid.sql.calcite.planner.DruidOperatorTable;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.planner.PlannerFactory;
import org.apache.druid.sql.calcite.schema.DruidSchema;
+import org.apache.druid.sql.calcite.schema.MetadataSegmentView;
import org.apache.druid.sql.calcite.schema.SystemSchema;
import org.apache.druid.sql.calcite.view.NoopViewManager;
import org.apache.druid.sql.calcite.view.ViewManager;
@@ -739,7 +742,8 @@ public class CalciteTests
public static SystemSchema createMockSystemSchema(
final DruidSchema druidSchema,
- final SpecificSegmentsQuerySegmentWalker walker
+ final SpecificSegmentsQuerySegmentWalker walker,
+ final PlannerConfig plannerConfig
)
{
final DruidLeaderClient druidLeaderClient = new DruidLeaderClient(
@@ -753,6 +757,13 @@ public class CalciteTests
};
final SystemSchema schema = new SystemSchema(
druidSchema,
+ new MetadataSegmentView(
+ druidLeaderClient,
+ getJsonMapper(),
+ new BytesAccumulatingResponseHandler(),
+ new BrokerSegmentWatcherConfig(),
+ plannerConfig
+ ),
new TestServerInventoryView(walker.getSegments()),
TEST_AUTHORIZER_MAPPER,
druidLeaderClient,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]