This is an automated email from the ASF dual-hosted git repository.
suneet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 1930ad1 Implement configurable internally generated query context
(#11429)
1930ad1 is described below
commit 1930ad1f47126210524a1a9b0bd56627355b51e0
Author: Lucas Capistrant <[email protected]>
AuthorDate: Wed Oct 6 11:02:41 2021 -0500
Implement configurable internally generated query context (#11429)
* Add the ability to add a context to internally generated druid broker
queries
* fix docs
* changes after first CI failure
* cleanup after merge with master
* change default to empty map and improve unit tests
* add doc info and fix checkstyle
* refactor DruidSchema#runSegmentMetadataQuery and add a unit test
---
docs/configuration/index.md | 14 +++
.../druid/client/BrokerInternalQueryConfig.java | 41 +++++++
.../client/BrokerInternalQueryConfigTest.java | 121 ++++++++++++++++++++
.../main/java/org/apache/druid/cli/CliBroker.java | 2 +
.../druid/sql/calcite/schema/DruidSchema.java | 32 ++++--
.../calcite/schema/DruidSchemaConcurrencyTest.java | 7 +-
.../calcite/schema/DruidSchemaNoDataInitTest.java | 4 +-
.../druid/sql/calcite/schema/DruidSchemaTest.java | 122 ++++++++++++++++++---
.../druid/sql/calcite/schema/SystemSchemaTest.java | 4 +-
.../druid/sql/calcite/util/CalciteTests.java | 4 +-
10 files changed, 320 insertions(+), 31 deletions(-)
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 001bb40..1d20029 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -1781,6 +1781,20 @@ line.
See [general query configuration](#general-query-configuration).
+###### Broker Generated Query Configuration Supplementation
+
+The Broker generates queries internally. This configuration section describes
how an operator can augment the configuration
+of these queries.
+
+As of now the only supported augmentation is overriding the default query
context. This allows an operator the flexibility
+to adjust it as they see fit. A common use of this configuration is to
override the query priority of the cluster generated
+queries in order to avoid running as a default priority of 0.
+
+|Property|Description|Default|
+|--------|-----------|-------|
+|`druid.broker.internal.query.config.context`|A string formatted `key:value`
map of a query context to add to internally generated broker queries.|null|
+
+
#### SQL
The Druid SQL server is configured through the following properties on the
Broker.
diff --git
a/server/src/main/java/org/apache/druid/client/BrokerInternalQueryConfig.java
b/server/src/main/java/org/apache/druid/client/BrokerInternalQueryConfig.java
new file mode 100644
index 0000000..9b89377
--- /dev/null
+++
b/server/src/main/java/org/apache/druid/client/BrokerInternalQueryConfig.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This class contains configuration that internally generated Druid queries
+ * should add to their query payload. The runtime properties for this class
+ * have the prefix "druid.broker.internal.query.config."
+ */
+public class BrokerInternalQueryConfig
+{
+ @JsonProperty
+ private Map<String, Object> context = new HashMap<>();
+
+ public Map<String, Object> getContext()
+ {
+ return context;
+ }
+}
diff --git
a/server/src/test/java/org/apache/druid/client/BrokerInternalQueryConfigTest.java
b/server/src/test/java/org/apache/druid/client/BrokerInternalQueryConfigTest.java
new file mode 100644
index 0000000..24b61bb
--- /dev/null
+++
b/server/src/test/java/org/apache/druid/client/BrokerInternalQueryConfigTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client;
+
+import com.fasterxml.jackson.core.io.JsonEOFException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Binder;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.google.inject.Provides;
+import org.apache.druid.guice.ConfigModule;
+import org.apache.druid.guice.DruidGuiceExtensions;
+import org.apache.druid.guice.JsonConfigProvider;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.segment.TestHelper;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class BrokerInternalQueryConfigTest
+{
+ private static final ObjectMapper MAPPER = TestHelper.makeJsonMapper();
+
+ @Test
+ public void testSerde() throws Exception
+ {
+ //defaults
+ String json = "{}";
+
+ BrokerInternalQueryConfig config = MAPPER.readValue(
+ MAPPER.writeValueAsString(
+ MAPPER.readValue(json, BrokerInternalQueryConfig.class)
+ ),
+ BrokerInternalQueryConfig.class
+ );
+
+ Assert.assertEquals(ImmutableMap.of(), config.getContext());
+
+ //non-defaults
+ json = "{ \"context\": {\"priority\": 5}}";
+
+ config = MAPPER.readValue(
+ MAPPER.writeValueAsString(
+ MAPPER.readValue(json, BrokerInternalQueryConfig.class)
+ ),
+ BrokerInternalQueryConfig.class
+ );
+
+ Map<String, Object> expected = new HashMap<>();
+ expected.put("priority", 5);
+ Assert.assertEquals(expected, config.getContext());
+ }
+
+ /**
+ * Malformatted configuration will trigger an exception and fail to startup
the service
+ *
+ * @throws Exception
+ */
+ @Test(expected = JsonEOFException.class)
+ public void testMalfomattedContext() throws Exception
+ {
+ String malformedJson = "{\"priority: 5}";
+ MAPPER.readValue(
+ MAPPER.writeValueAsString(
+ MAPPER.readValue(malformedJson, BrokerInternalQueryConfig.class)
+ ),
+ BrokerInternalQueryConfig.class
+ );
+ }
+
+ /**
+ * Test the behavior if the operator does not specify anything for
druid.broker.internal.query.config.context in runtime.properties
+ */
+ @Test
+ public void testDefaultBehavior()
+ {
+ Injector injector = Guice.createInjector(
+ new Module()
+ {
+ @Override
+ public void configure(Binder binder)
+ {
+ binder.install(new ConfigModule());
+ binder.install(new DruidGuiceExtensions());
+ JsonConfigProvider.bind(binder,
"druid.broker.internal.query.config", BrokerInternalQueryConfig.class);
+ }
+
+ @Provides
+ @LazySingleton
+ public ObjectMapper jsonMapper()
+ {
+ return new DefaultObjectMapper();
+ }
+ }
+ );
+ BrokerInternalQueryConfig config =
injector.getInstance(BrokerInternalQueryConfig.class);
+ Assert.assertEquals(ImmutableMap.of(), config.getContext());
+ }
+}
diff --git a/services/src/main/java/org/apache/druid/cli/CliBroker.java
b/services/src/main/java/org/apache/druid/cli/CliBroker.java
index 5ed03aa..cb27905 100644
--- a/services/src/main/java/org/apache/druid/cli/CliBroker.java
+++ b/services/src/main/java/org/apache/druid/cli/CliBroker.java
@@ -25,6 +25,7 @@ import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.name.Names;
import io.airlift.airline.Command;
+import org.apache.druid.client.BrokerInternalQueryConfig;
import org.apache.druid.client.BrokerSegmentWatcherConfig;
import org.apache.druid.client.BrokerServerView;
import org.apache.druid.client.CachingClusteredClient;
@@ -126,6 +127,7 @@ public class CliBroker extends ServerRunnable
JsonConfigProvider.bind(binder, "druid.broker.balancer",
ServerSelectorStrategy.class);
JsonConfigProvider.bind(binder, "druid.broker.retryPolicy",
RetryQueryRunnerConfig.class);
JsonConfigProvider.bind(binder, "druid.broker.segment",
BrokerSegmentWatcherConfig.class);
+ JsonConfigProvider.bind(binder,
"druid.broker.internal.query.config", BrokerInternalQueryConfig.class);
binder.bind(QuerySegmentWalker.class).to(ClientQuerySegmentWalker.class).in(LazySingleton.class);
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
index e5846a9..70e154f 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
@@ -32,6 +32,7 @@ import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.inject.Inject;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.AbstractSchema;
+import org.apache.druid.client.BrokerInternalQueryConfig;
import org.apache.druid.client.ServerView;
import org.apache.druid.client.TimelineServerView;
import org.apache.druid.guice.ManageLifecycle;
@@ -60,7 +61,6 @@ import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.security.Access;
-import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.server.security.Escalator;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.table.DruidTable;
@@ -185,6 +185,9 @@ public class DruidSchema extends AbstractSchema
@GuardedBy("lock")
private final TreeSet<SegmentId> segmentsNeedingRefresh = new
TreeSet<>(SEGMENT_ORDER);
+ // Configured context to attach to internally generated queries.
+ private final BrokerInternalQueryConfig brokerInternalQueryConfig;
+
@GuardedBy("lock")
private boolean refreshImmediately = false;
@@ -205,7 +208,8 @@ public class DruidSchema extends AbstractSchema
final SegmentManager segmentManager,
final JoinableFactory joinableFactory,
final PlannerConfig config,
- final Escalator escalator
+ final Escalator escalator,
+ final BrokerInternalQueryConfig brokerInternalQueryConfig
)
{
this.queryLifecycleFactory =
Preconditions.checkNotNull(queryLifecycleFactory, "queryLifecycleFactory");
@@ -216,6 +220,7 @@ public class DruidSchema extends AbstractSchema
this.cacheExec = Execs.singleThreaded("DruidSchema-Cache-%d");
this.callbackExec = Execs.singleThreaded("DruidSchema-Callback-%d");
this.escalator = escalator;
+ this.brokerInternalQueryConfig = brokerInternalQueryConfig;
serverView.registerTimelineCallback(
callbackExec,
@@ -674,9 +679,7 @@ public class DruidSchema extends AbstractSchema
final Set<SegmentId> retVal = new HashSet<>();
final Sequence<SegmentAnalysis> sequence = runSegmentMetadataQuery(
- queryLifecycleFactory,
- Iterables.limit(segments, MAX_SEGMENTS_PER_QUERY),
- escalator.createEscalatedAuthenticationResult()
+ Iterables.limit(segments, MAX_SEGMENTS_PER_QUERY)
);
Yielder<SegmentAnalysis> yielder = Yielders.each(sequence);
@@ -835,10 +838,15 @@ public class DruidSchema extends AbstractSchema
}
}
- private static Sequence<SegmentAnalysis> runSegmentMetadataQuery(
- final QueryLifecycleFactory queryLifecycleFactory,
- final Iterable<SegmentId> segments,
- final AuthenticationResult authenticationResult
+ /**
+ * Execute a SegmentMetadata query and return a {@link Sequence} of {@link
SegmentAnalysis}.
+ *
+ * @param segments Iterable of {@link SegmentId} objects that are subject of
the SegmentMetadata query.
+ * @return {@link Sequence} of {@link SegmentAnalysis} objects
+ */
+ @VisibleForTesting
+ Sequence<SegmentAnalysis> runSegmentMetadataQuery(
+ final Iterable<SegmentId> segments
)
{
// Sanity check: getOnlyElement of a set, to ensure all segments have the
same dataSource.
@@ -857,13 +865,15 @@ public class DruidSchema extends AbstractSchema
querySegmentSpec,
new AllColumnIncluderator(),
false,
- ImmutableMap.of(),
+ brokerInternalQueryConfig.getContext(),
EnumSet.noneOf(SegmentMetadataQuery.AnalysisType.class),
false,
false
);
- return queryLifecycleFactory.factorize().runSimple(segmentMetadataQuery,
authenticationResult, Access.OK);
+ return queryLifecycleFactory
+ .factorize()
+ .runSimple(segmentMetadataQuery,
escalator.createEscalatedAuthenticationResult(), Access.OK);
}
private static RowSignature analysisToRowSignature(final SegmentAnalysis
analysis)
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaConcurrencyTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaConcurrencyTest.java
index 129bf9d..6f0d0b4 100644
---
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaConcurrencyTest.java
+++
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaConcurrencyTest.java
@@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
+import org.apache.druid.client.BrokerInternalQueryConfig;
import org.apache.druid.client.BrokerSegmentWatcherConfig;
import org.apache.druid.client.BrokerServerView;
import org.apache.druid.client.DruidServer;
@@ -135,7 +136,8 @@ public class DruidSchemaConcurrencyTest extends
DruidSchemaTestCommon
segmentManager,
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
PLANNER_CONFIG_DEFAULT,
- new NoopEscalator()
+ new NoopEscalator(),
+ new BrokerInternalQueryConfig()
)
{
@Override
@@ -238,7 +240,8 @@ public class DruidSchemaConcurrencyTest extends
DruidSchemaTestCommon
segmentManager,
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
PLANNER_CONFIG_DEFAULT,
- new NoopEscalator()
+ new NoopEscalator(),
+ new BrokerInternalQueryConfig()
)
{
@Override
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java
index 96a464e..b2c1981 100644
---
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java
+++
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.sql.calcite.schema;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import org.apache.druid.client.BrokerInternalQueryConfig;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.segment.join.MapJoinableFactory;
@@ -57,7 +58,8 @@ public class DruidSchemaNoDataInitTest extends CalciteTestBase
new SegmentManager(EasyMock.createMock(SegmentLoader.class)),
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
PLANNER_CONFIG_DEFAULT,
- new NoopEscalator()
+ new NoopEscalator(),
+ new BrokerInternalQueryConfig()
);
druidSchema.start();
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java
index e2a7554..2bfd3ca 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java
@@ -19,6 +19,7 @@
package org.apache.druid.sql.calcite.schema;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
@@ -28,6 +29,7 @@ import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.schema.Table;
import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.druid.client.BrokerInternalQueryConfig;
import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
@@ -37,13 +39,21 @@ import
org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import
org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
+import org.apache.druid.query.metadata.metadata.AllColumnIncluderator;
+import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery;
+import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.join.MapJoinableFactory;
import
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
+import org.apache.druid.server.QueryLifecycle;
+import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
+import org.apache.druid.server.security.Access;
+import org.apache.druid.server.security.AllowAllAuthenticator;
import org.apache.druid.server.security.NoopEscalator;
import org.apache.druid.sql.calcite.table.DruidTable;
import org.apache.druid.sql.calcite.util.CalciteTests;
@@ -54,6 +64,7 @@ import org.apache.druid.timeline.DataSegment.PruneSpecsHolder;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -61,6 +72,7 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
+import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -77,6 +89,7 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
private DruidSchema schema2 = null;
private CountDownLatch buildTableLatch = new CountDownLatch(1);
private CountDownLatch markDataSourceLatch = new CountDownLatch(1);
+ private static final ObjectMapper MAPPER = TestHelper.makeJsonMapper();
@Before
public void setUp() throws Exception
@@ -158,9 +171,13 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
serverView,
segmentManager,
- new MapJoinableFactory(ImmutableSet.of(globalTableJoinable),
ImmutableMap.of(globalTableJoinable.getClass(), GlobalTableDataSource.class)),
+ new MapJoinableFactory(
+ ImmutableSet.of(globalTableJoinable),
+ ImmutableMap.of(globalTableJoinable.getClass(),
GlobalTableDataSource.class)
+ ),
PLANNER_CONFIG_DEFAULT,
- new NoopEscalator()
+ new NoopEscalator(),
+ new BrokerInternalQueryConfig()
)
{
@Override
@@ -185,11 +202,13 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
segmentManager,
new MapJoinableFactory(ImmutableSet.of(globalTableJoinable),
ImmutableMap.of(globalTableJoinable.getClass(), GlobalTableDataSource.class)),
PLANNER_CONFIG_DEFAULT,
- new NoopEscalator()
+ new NoopEscalator(),
+ new BrokerInternalQueryConfig()
)
{
boolean throwException = true;
+
@Override
protected DruidTable buildDruidTable(String dataSource)
{
@@ -414,9 +433,9 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
Assert.assertEquals(1L, metadata.isRealtime());
// get the historical server
final ImmutableDruidServer historicalServer = druidServers.stream()
- .filter(s ->
s.getType().equals(ServerType.HISTORICAL))
- .findAny()
- .orElse(null);
+ .filter(s ->
s.getType().equals(ServerType.HISTORICAL))
+ .findAny()
+ .orElse(null);
Assert.assertNotNull(historicalServer);
final DruidServerMetadata historicalServerMetadata =
historicalServer.getMetadata();
@@ -461,7 +480,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
segmentManager,
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
PLANNER_CONFIG_DEFAULT,
- new NoopEscalator()
+ new NoopEscalator(),
+ new BrokerInternalQueryConfig()
)
{
@Override
@@ -502,7 +522,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
segmentManager,
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
PLANNER_CONFIG_DEFAULT,
- new NoopEscalator()
+ new NoopEscalator(),
+ new BrokerInternalQueryConfig()
)
{
@Override
@@ -547,7 +568,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
segmentManager,
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
PLANNER_CONFIG_DEFAULT,
- new NoopEscalator()
+ new NoopEscalator(),
+ new BrokerInternalQueryConfig()
)
{
@Override
@@ -589,7 +611,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
segmentManager,
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
PLANNER_CONFIG_DEFAULT,
- new NoopEscalator()
+ new NoopEscalator(),
+ new BrokerInternalQueryConfig()
)
{
@Override
@@ -628,7 +651,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
segmentManager,
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
PLANNER_CONFIG_DEFAULT,
- new NoopEscalator()
+ new NoopEscalator(),
+ new BrokerInternalQueryConfig()
)
{
@Override
@@ -684,7 +708,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
segmentManager,
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
PLANNER_CONFIG_DEFAULT,
- new NoopEscalator()
+ new NoopEscalator(),
+ new BrokerInternalQueryConfig()
)
{
@Override
@@ -743,7 +768,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
segmentManager,
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
PLANNER_CONFIG_DEFAULT,
- new NoopEscalator()
+ new NoopEscalator(),
+ new BrokerInternalQueryConfig()
)
{
@Override
@@ -776,7 +802,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
segmentManager,
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
PLANNER_CONFIG_DEFAULT,
- new NoopEscalator()
+ new NoopEscalator(),
+ new BrokerInternalQueryConfig()
)
{
@Override
@@ -822,7 +849,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
segmentManager,
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
PLANNER_CONFIG_DEFAULT,
- new NoopEscalator()
+ new NoopEscalator(),
+ new BrokerInternalQueryConfig()
)
{
@Override
@@ -995,6 +1023,70 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
Assert.assertFalse(fooTable.isJoinable());
}
+ /**
+ * Ensure that the BrokerInternalQueryConfig context is honored for this
internally generated SegmentMetadata Query
+ */
+ @Test
+ public void testRunSegmentMetadataQueryWithContext() throws Exception
+ {
+ Map<String, Object> queryContext = ImmutableMap.of("priority", 5);
+
+ String brokerInternalQueryConfigJson = "{\"context\": { \"priority\": 5}
}";
+
+ TestHelper.makeJsonMapper();
+ BrokerInternalQueryConfig brokerInternalQueryConfig = MAPPER.readValue(
+ MAPPER.writeValueAsString(
+ MAPPER.readValue(brokerInternalQueryConfigJson,
BrokerInternalQueryConfig.class)
+ ),
+ BrokerInternalQueryConfig.class
+ );
+
+ DataSegment segment = newSegment("test", 0);
+ List<SegmentId> segmentIterable = ImmutableList.of(segment.getId());
+
+ // This is the query that we expect this method to create. We will be
testing that it matches the query generated by the method under test.
+ SegmentMetadataQuery expectedMetadataQuery = new SegmentMetadataQuery(
+ new TableDataSource(segment.getDataSource()),
+ new MultipleSpecificSegmentSpec(
+ segmentIterable.stream()
+
.map(SegmentId::toDescriptor).collect(Collectors.toList())),
+ new AllColumnIncluderator(),
+ false,
+ queryContext,
+ EnumSet.noneOf(SegmentMetadataQuery.AnalysisType.class),
+ false,
+ false
+ );
+
+ QueryLifecycleFactory factoryMock =
EasyMock.createMock(QueryLifecycleFactory.class);
+ QueryLifecycle lifecycleMock = EasyMock.createMock(QueryLifecycle.class);
+
+ // Need to create schema for this test because the available schemas don't
mock the QueryLifecycleFactory, which I need for this test.
+ DruidSchema mySchema = new DruidSchema(
+ factoryMock,
+ serverView,
+ segmentManager,
+ new MapJoinableFactory(
+ ImmutableSet.of(globalTableJoinable),
+ ImmutableMap.of(globalTableJoinable.getClass(),
GlobalTableDataSource.class)
+ ),
+ PLANNER_CONFIG_DEFAULT,
+ new NoopEscalator(),
+ brokerInternalQueryConfig
+ );
+
+ EasyMock.expect(factoryMock.factorize()).andReturn(lifecycleMock).once();
+ // This is the mat of the test, making sure that the query created by the
method under test matches the expected query, specifically the operator
configured context
+ EasyMock.expect(lifecycleMock.runSimple(expectedMetadataQuery,
AllowAllAuthenticator.ALLOW_ALL_RESULT, Access.OK)).andReturn(null);
+
+ EasyMock.replay(factoryMock, lifecycleMock);
+
+ mySchema.runSegmentMetadataQuery(segmentIterable);
+
+ EasyMock.verify(factoryMock, lifecycleMock);
+
+ }
+
private static DataSegment newSegment(String datasource, int partitionId)
{
return new DataSegment(
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
index ed6599a..d25a902 100644
---
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
+++
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
@@ -34,6 +34,7 @@ import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Table;
import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.druid.client.BrokerInternalQueryConfig;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.client.ImmutableDruidServer;
@@ -255,7 +256,8 @@ public class SystemSchemaTest extends CalciteTestBase
new SegmentManager(EasyMock.createMock(SegmentLoader.class)),
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
PLANNER_CONFIG_DEFAULT,
- new NoopEscalator()
+ new NoopEscalator(),
+ new BrokerInternalQueryConfig()
);
druidSchema.start();
druidSchema.awaitInitialization();
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
index ca70e1d..79fc673 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
@@ -31,6 +31,7 @@ import com.google.inject.Injector;
import com.google.inject.Key;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.schema.SchemaPlus;
+import org.apache.druid.client.BrokerInternalQueryConfig;
import org.apache.druid.client.BrokerSegmentWatcherConfig;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.ServerInventoryView;
@@ -1243,7 +1244,8 @@ public class CalciteTests
},
createDefaultJoinableFactory(),
plannerConfig,
- TEST_AUTHENTICATOR_ESCALATOR
+ TEST_AUTHENTICATOR_ESCALATOR,
+ new BrokerInternalQueryConfig()
);
try {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]