This is an automated email from the ASF dual-hosted git repository.

suneet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 1930ad1  Implement configurable internally generated query context 
(#11429)
1930ad1 is described below

commit 1930ad1f47126210524a1a9b0bd56627355b51e0
Author: Lucas Capistrant <[email protected]>
AuthorDate: Wed Oct 6 11:02:41 2021 -0500

    Implement configurable internally generated query context (#11429)
    
    * Add the ability to add a context to internally generated druid broker 
queries
    
    * fix docs
    
    * changes after first CI failure
    
    * cleanup after merge with master
    
    * change default to empty map and improve unit tests
    
    * add doc info and fix checkstyle
    
    * refactor DruidSchema#runSegmentMetadataQuery and add a unit test
---
 docs/configuration/index.md                        |  14 +++
 .../druid/client/BrokerInternalQueryConfig.java    |  41 +++++++
 .../client/BrokerInternalQueryConfigTest.java      | 121 ++++++++++++++++++++
 .../main/java/org/apache/druid/cli/CliBroker.java  |   2 +
 .../druid/sql/calcite/schema/DruidSchema.java      |  32 ++++--
 .../calcite/schema/DruidSchemaConcurrencyTest.java |   7 +-
 .../calcite/schema/DruidSchemaNoDataInitTest.java  |   4 +-
 .../druid/sql/calcite/schema/DruidSchemaTest.java  | 122 ++++++++++++++++++---
 .../druid/sql/calcite/schema/SystemSchemaTest.java |   4 +-
 .../druid/sql/calcite/util/CalciteTests.java       |   4 +-
 10 files changed, 320 insertions(+), 31 deletions(-)

diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 001bb40..1d20029 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -1781,6 +1781,20 @@ line.
 
 See [general query configuration](#general-query-configuration).
 
+###### Broker Generated Query Configuration Supplementation
+
+The Broker generates queries internally. This configuration section describes 
how an operator can augment the configuration
+of these queries.
+
+As of now the only supported augmentation is overriding the default query 
context. This allows an operator the flexibility
+to adjust it as they see fit. A common use of this configuration is to 
override the query priority of the cluster generated
+queries in order to avoid running as a default priority of 0.
+
+|Property|Description|Default|
+|--------|-----------|-------|
+|`druid.broker.internal.query.config.context`|A string formatted `key:value` 
map of a query context to add to internally generated broker queries.|null|
+
+
 #### SQL
 
 The Druid SQL server is configured through the following properties on the 
Broker.
diff --git 
a/server/src/main/java/org/apache/druid/client/BrokerInternalQueryConfig.java 
b/server/src/main/java/org/apache/druid/client/BrokerInternalQueryConfig.java
new file mode 100644
index 0000000..9b89377
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/client/BrokerInternalQueryConfig.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This class contains configuration that internally generated Druid queries
+ * should add to their query payload. The runtime properties for this class
+ * have the prefix "druid.broker.internal.query.config."
+ */
+public class BrokerInternalQueryConfig
+{
+  @JsonProperty
+  private Map<String, Object> context = new HashMap<>();
+
+  public Map<String, Object> getContext()
+  {
+    return context;
+  }
+}
diff --git 
a/server/src/test/java/org/apache/druid/client/BrokerInternalQueryConfigTest.java
 
b/server/src/test/java/org/apache/druid/client/BrokerInternalQueryConfigTest.java
new file mode 100644
index 0000000..24b61bb
--- /dev/null
+++ 
b/server/src/test/java/org/apache/druid/client/BrokerInternalQueryConfigTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client;
+
+import com.fasterxml.jackson.core.io.JsonEOFException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Binder;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.google.inject.Provides;
+import org.apache.druid.guice.ConfigModule;
+import org.apache.druid.guice.DruidGuiceExtensions;
+import org.apache.druid.guice.JsonConfigProvider;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.segment.TestHelper;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class BrokerInternalQueryConfigTest
+{
+  private static final ObjectMapper MAPPER = TestHelper.makeJsonMapper();
+
+  @Test
+  public void testSerde() throws Exception
+  {
+    //defaults
+    String json = "{}";
+
+    BrokerInternalQueryConfig config = MAPPER.readValue(
+        MAPPER.writeValueAsString(
+            MAPPER.readValue(json, BrokerInternalQueryConfig.class)
+        ),
+        BrokerInternalQueryConfig.class
+    );
+
+    Assert.assertEquals(ImmutableMap.of(), config.getContext());
+
+    //non-defaults
+    json = "{ \"context\": {\"priority\": 5}}";
+
+    config = MAPPER.readValue(
+        MAPPER.writeValueAsString(
+            MAPPER.readValue(json, BrokerInternalQueryConfig.class)
+        ),
+        BrokerInternalQueryConfig.class
+    );
+
+    Map<String, Object> expected = new HashMap<>();
+    expected.put("priority", 5);
+    Assert.assertEquals(expected, config.getContext());
+  }
+
+  /**
+   * Malformatted configuration will trigger an exception and fail to startup 
the service
+   *
+   * @throws Exception
+   */
+  @Test(expected = JsonEOFException.class)
+  public void testMalfomattedContext() throws Exception
+  {
+    String malformedJson = "{\"priority: 5}";
+    MAPPER.readValue(
+        MAPPER.writeValueAsString(
+            MAPPER.readValue(malformedJson, BrokerInternalQueryConfig.class)
+        ),
+        BrokerInternalQueryConfig.class
+    );
+  }
+
+  /**
+   * Test the behavior if the operator does not specify anything for 
druid.broker.internal.query.config.context in runtime.properties
+   */
+  @Test
+  public void testDefaultBehavior()
+  {
+    Injector injector = Guice.createInjector(
+        new Module()
+        {
+          @Override
+          public void configure(Binder binder)
+          {
+            binder.install(new ConfigModule());
+            binder.install(new DruidGuiceExtensions());
+            JsonConfigProvider.bind(binder, 
"druid.broker.internal.query.config", BrokerInternalQueryConfig.class);
+          }
+
+          @Provides
+          @LazySingleton
+          public ObjectMapper jsonMapper()
+          {
+            return new DefaultObjectMapper();
+          }
+        }
+    );
+    BrokerInternalQueryConfig config = 
injector.getInstance(BrokerInternalQueryConfig.class);
+    Assert.assertEquals(ImmutableMap.of(), config.getContext());
+  }
+}
diff --git a/services/src/main/java/org/apache/druid/cli/CliBroker.java 
b/services/src/main/java/org/apache/druid/cli/CliBroker.java
index 5ed03aa..cb27905 100644
--- a/services/src/main/java/org/apache/druid/cli/CliBroker.java
+++ b/services/src/main/java/org/apache/druid/cli/CliBroker.java
@@ -25,6 +25,7 @@ import com.google.inject.Key;
 import com.google.inject.Module;
 import com.google.inject.name.Names;
 import io.airlift.airline.Command;
+import org.apache.druid.client.BrokerInternalQueryConfig;
 import org.apache.druid.client.BrokerSegmentWatcherConfig;
 import org.apache.druid.client.BrokerServerView;
 import org.apache.druid.client.CachingClusteredClient;
@@ -126,6 +127,7 @@ public class CliBroker extends ServerRunnable
           JsonConfigProvider.bind(binder, "druid.broker.balancer", 
ServerSelectorStrategy.class);
           JsonConfigProvider.bind(binder, "druid.broker.retryPolicy", 
RetryQueryRunnerConfig.class);
           JsonConfigProvider.bind(binder, "druid.broker.segment", 
BrokerSegmentWatcherConfig.class);
+          JsonConfigProvider.bind(binder, 
"druid.broker.internal.query.config", BrokerInternalQueryConfig.class);
 
           
binder.bind(QuerySegmentWalker.class).to(ClientQuerySegmentWalker.class).in(LazySingleton.class);
 
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java 
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
index e5846a9..70e154f 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
@@ -32,6 +32,7 @@ import com.google.errorprone.annotations.concurrent.GuardedBy;
 import com.google.inject.Inject;
 import org.apache.calcite.schema.Table;
 import org.apache.calcite.schema.impl.AbstractSchema;
+import org.apache.druid.client.BrokerInternalQueryConfig;
 import org.apache.druid.client.ServerView;
 import org.apache.druid.client.TimelineServerView;
 import org.apache.druid.guice.ManageLifecycle;
@@ -60,7 +61,6 @@ import org.apache.druid.server.SegmentManager;
 import org.apache.druid.server.coordination.DruidServerMetadata;
 import org.apache.druid.server.coordination.ServerType;
 import org.apache.druid.server.security.Access;
-import org.apache.druid.server.security.AuthenticationResult;
 import org.apache.druid.server.security.Escalator;
 import org.apache.druid.sql.calcite.planner.PlannerConfig;
 import org.apache.druid.sql.calcite.table.DruidTable;
@@ -185,6 +185,9 @@ public class DruidSchema extends AbstractSchema
   @GuardedBy("lock")
   private final TreeSet<SegmentId> segmentsNeedingRefresh = new 
TreeSet<>(SEGMENT_ORDER);
 
+  // Configured context to attach to internally generated queries.
+  private final BrokerInternalQueryConfig brokerInternalQueryConfig;
+
   @GuardedBy("lock")
   private boolean refreshImmediately = false;
 
@@ -205,7 +208,8 @@ public class DruidSchema extends AbstractSchema
       final SegmentManager segmentManager,
       final JoinableFactory joinableFactory,
       final PlannerConfig config,
-      final Escalator escalator
+      final Escalator escalator,
+      final BrokerInternalQueryConfig brokerInternalQueryConfig
   )
   {
     this.queryLifecycleFactory = 
Preconditions.checkNotNull(queryLifecycleFactory, "queryLifecycleFactory");
@@ -216,6 +220,7 @@ public class DruidSchema extends AbstractSchema
     this.cacheExec = Execs.singleThreaded("DruidSchema-Cache-%d");
     this.callbackExec = Execs.singleThreaded("DruidSchema-Callback-%d");
     this.escalator = escalator;
+    this.brokerInternalQueryConfig = brokerInternalQueryConfig;
 
     serverView.registerTimelineCallback(
         callbackExec,
@@ -674,9 +679,7 @@ public class DruidSchema extends AbstractSchema
 
     final Set<SegmentId> retVal = new HashSet<>();
     final Sequence<SegmentAnalysis> sequence = runSegmentMetadataQuery(
-        queryLifecycleFactory,
-        Iterables.limit(segments, MAX_SEGMENTS_PER_QUERY),
-        escalator.createEscalatedAuthenticationResult()
+        Iterables.limit(segments, MAX_SEGMENTS_PER_QUERY)
     );
 
     Yielder<SegmentAnalysis> yielder = Yielders.each(sequence);
@@ -835,10 +838,15 @@ public class DruidSchema extends AbstractSchema
     }
   }
 
-  private static Sequence<SegmentAnalysis> runSegmentMetadataQuery(
-      final QueryLifecycleFactory queryLifecycleFactory,
-      final Iterable<SegmentId> segments,
-      final AuthenticationResult authenticationResult
+  /**
+   * Execute a SegmentMetadata query and return a {@link Sequence} of {@link 
SegmentAnalysis}.
+   *
+   * @param segments Iterable of {@link SegmentId} objects that are subject of 
the SegmentMetadata query.
+   * @return {@link Sequence} of {@link SegmentAnalysis} objects
+   */
+  @VisibleForTesting
+  Sequence<SegmentAnalysis> runSegmentMetadataQuery(
+      final Iterable<SegmentId> segments
   )
   {
     // Sanity check: getOnlyElement of a set, to ensure all segments have the 
same dataSource.
@@ -857,13 +865,15 @@ public class DruidSchema extends AbstractSchema
         querySegmentSpec,
         new AllColumnIncluderator(),
         false,
-        ImmutableMap.of(),
+        brokerInternalQueryConfig.getContext(),
         EnumSet.noneOf(SegmentMetadataQuery.AnalysisType.class),
         false,
         false
     );
 
-    return queryLifecycleFactory.factorize().runSimple(segmentMetadataQuery, 
authenticationResult, Access.OK);
+    return queryLifecycleFactory
+        .factorize()
+        .runSimple(segmentMetadataQuery, 
escalator.createEscalatedAuthenticationResult(), Access.OK);
   }
 
   private static RowSignature analysisToRowSignature(final SegmentAnalysis 
analysis)
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaConcurrencyTest.java
 
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaConcurrencyTest.java
index 129bf9d..6f0d0b4 100644
--- 
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaConcurrencyTest.java
+++ 
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaConcurrencyTest.java
@@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
+import org.apache.druid.client.BrokerInternalQueryConfig;
 import org.apache.druid.client.BrokerSegmentWatcherConfig;
 import org.apache.druid.client.BrokerServerView;
 import org.apache.druid.client.DruidServer;
@@ -135,7 +136,8 @@ public class DruidSchemaConcurrencyTest extends 
DruidSchemaTestCommon
         segmentManager,
         new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
         PLANNER_CONFIG_DEFAULT,
-        new NoopEscalator()
+        new NoopEscalator(),
+        new BrokerInternalQueryConfig()
     )
     {
       @Override
@@ -238,7 +240,8 @@ public class DruidSchemaConcurrencyTest extends 
DruidSchemaTestCommon
         segmentManager,
         new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
         PLANNER_CONFIG_DEFAULT,
-        new NoopEscalator()
+        new NoopEscalator(),
+        new BrokerInternalQueryConfig()
     )
     {
       @Override
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java
 
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java
index 96a464e..b2c1981 100644
--- 
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java
+++ 
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.sql.calcite.schema;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import org.apache.druid.client.BrokerInternalQueryConfig;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.query.QueryRunnerFactoryConglomerate;
 import org.apache.druid.segment.join.MapJoinableFactory;
@@ -57,7 +58,8 @@ public class DruidSchemaNoDataInitTest extends CalciteTestBase
           new SegmentManager(EasyMock.createMock(SegmentLoader.class)),
           new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
           PLANNER_CONFIG_DEFAULT,
-          new NoopEscalator()
+          new NoopEscalator(),
+          new BrokerInternalQueryConfig()
       );
 
       druidSchema.start();
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java
index e2a7554..2bfd3ca 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.sql.calcite.schema;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
@@ -28,6 +29,7 @@ import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.schema.Table;
 import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.druid.client.BrokerInternalQueryConfig;
 import org.apache.druid.client.ImmutableDruidServer;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.Pair;
@@ -37,13 +39,21 @@ import 
org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
 import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
 import 
org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
+import org.apache.druid.query.metadata.metadata.AllColumnIncluderator;
+import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery;
+import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
 import org.apache.druid.segment.IndexBuilder;
 import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
 import org.apache.druid.segment.join.MapJoinableFactory;
 import 
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
+import org.apache.druid.server.QueryLifecycle;
+import org.apache.druid.server.QueryLifecycleFactory;
 import org.apache.druid.server.coordination.DruidServerMetadata;
 import org.apache.druid.server.coordination.ServerType;
+import org.apache.druid.server.security.Access;
+import org.apache.druid.server.security.AllowAllAuthenticator;
 import org.apache.druid.server.security.NoopEscalator;
 import org.apache.druid.sql.calcite.table.DruidTable;
 import org.apache.druid.sql.calcite.util.CalciteTests;
@@ -54,6 +64,7 @@ import org.apache.druid.timeline.DataSegment.PruneSpecsHolder;
 import org.apache.druid.timeline.SegmentId;
 import org.apache.druid.timeline.partition.LinearShardSpec;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.easymock.EasyMock;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -61,6 +72,7 @@ import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -77,6 +89,7 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
   private DruidSchema schema2 = null;
   private CountDownLatch buildTableLatch = new CountDownLatch(1);
   private CountDownLatch markDataSourceLatch = new CountDownLatch(1);
+  private static final ObjectMapper MAPPER = TestHelper.makeJsonMapper();
 
   @Before
   public void setUp() throws Exception
@@ -158,9 +171,13 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
         CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
         serverView,
         segmentManager,
-        new MapJoinableFactory(ImmutableSet.of(globalTableJoinable), 
ImmutableMap.of(globalTableJoinable.getClass(), GlobalTableDataSource.class)),
+        new MapJoinableFactory(
+            ImmutableSet.of(globalTableJoinable),
+            ImmutableMap.of(globalTableJoinable.getClass(), 
GlobalTableDataSource.class)
+        ),
         PLANNER_CONFIG_DEFAULT,
-        new NoopEscalator()
+        new NoopEscalator(),
+        new BrokerInternalQueryConfig()
     )
     {
       @Override
@@ -185,11 +202,13 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
         segmentManager,
         new MapJoinableFactory(ImmutableSet.of(globalTableJoinable), 
ImmutableMap.of(globalTableJoinable.getClass(), GlobalTableDataSource.class)),
         PLANNER_CONFIG_DEFAULT,
-        new NoopEscalator()
+        new NoopEscalator(),
+        new BrokerInternalQueryConfig()
     )
     {
 
       boolean throwException = true;
+
       @Override
       protected DruidTable buildDruidTable(String dataSource)
       {
@@ -414,9 +433,9 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
     Assert.assertEquals(1L, metadata.isRealtime());
     // get the historical server
     final ImmutableDruidServer historicalServer = druidServers.stream()
-                                                        .filter(s -> 
s.getType().equals(ServerType.HISTORICAL))
-                                                        .findAny()
-                                                        .orElse(null);
+                                                              .filter(s -> 
s.getType().equals(ServerType.HISTORICAL))
+                                                              .findAny()
+                                                              .orElse(null);
 
     Assert.assertNotNull(historicalServer);
     final DruidServerMetadata historicalServerMetadata = 
historicalServer.getMetadata();
@@ -461,7 +480,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
         segmentManager,
         new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
         PLANNER_CONFIG_DEFAULT,
-        new NoopEscalator()
+        new NoopEscalator(),
+        new BrokerInternalQueryConfig()
     )
     {
       @Override
@@ -502,7 +522,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
         segmentManager,
         new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
         PLANNER_CONFIG_DEFAULT,
-        new NoopEscalator()
+        new NoopEscalator(),
+        new BrokerInternalQueryConfig()
     )
     {
       @Override
@@ -547,7 +568,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
         segmentManager,
         new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
         PLANNER_CONFIG_DEFAULT,
-        new NoopEscalator()
+        new NoopEscalator(),
+        new BrokerInternalQueryConfig()
     )
     {
       @Override
@@ -589,7 +611,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
         segmentManager,
         new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
         PLANNER_CONFIG_DEFAULT,
-        new NoopEscalator()
+        new NoopEscalator(),
+        new BrokerInternalQueryConfig()
     )
     {
       @Override
@@ -628,7 +651,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
         segmentManager,
         new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
         PLANNER_CONFIG_DEFAULT,
-        new NoopEscalator()
+        new NoopEscalator(),
+        new BrokerInternalQueryConfig()
     )
     {
       @Override
@@ -684,7 +708,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
         segmentManager,
         new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
         PLANNER_CONFIG_DEFAULT,
-        new NoopEscalator()
+        new NoopEscalator(),
+        new BrokerInternalQueryConfig()
     )
     {
       @Override
@@ -743,7 +768,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
         segmentManager,
         new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
         PLANNER_CONFIG_DEFAULT,
-        new NoopEscalator()
+        new NoopEscalator(),
+        new BrokerInternalQueryConfig()
     )
     {
       @Override
@@ -776,7 +802,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
         segmentManager,
         new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
         PLANNER_CONFIG_DEFAULT,
-        new NoopEscalator()
+        new NoopEscalator(),
+        new BrokerInternalQueryConfig()
     )
     {
       @Override
@@ -822,7 +849,8 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
         segmentManager,
         new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
         PLANNER_CONFIG_DEFAULT,
-        new NoopEscalator()
+        new NoopEscalator(),
+        new BrokerInternalQueryConfig()
     )
     {
       @Override
@@ -995,6 +1023,70 @@ public class DruidSchemaTest extends DruidSchemaTestCommon
     Assert.assertFalse(fooTable.isJoinable());
   }
 
+  /**
+   * Ensure that the BrokerInternalQueryConfig context is honored for this 
internally generated SegmentMetadata Query
+   */
+  @Test
+  public void testRunSegmentMetadataQueryWithContext() throws Exception
+  {
+    Map<String, Object> queryContext = ImmutableMap.of("priority", 5);
+
+    String brokerInternalQueryConfigJson = "{\"context\": { \"priority\": 5} 
}";
+
+    TestHelper.makeJsonMapper();
+    BrokerInternalQueryConfig brokerInternalQueryConfig = MAPPER.readValue(
+        MAPPER.writeValueAsString(
+            MAPPER.readValue(brokerInternalQueryConfigJson, 
BrokerInternalQueryConfig.class)
+        ),
+        BrokerInternalQueryConfig.class
+    );
+
+    DataSegment segment = newSegment("test", 0);
+    List<SegmentId> segmentIterable = ImmutableList.of(segment.getId());
+
+    // This is the query that we expect this method to create. We will be 
testing that it matches the query generated by the method under test.
+    SegmentMetadataQuery expectedMetadataQuery = new SegmentMetadataQuery(
+        new TableDataSource(segment.getDataSource()),
+        new MultipleSpecificSegmentSpec(
+            segmentIterable.stream()
+                         
.map(SegmentId::toDescriptor).collect(Collectors.toList())),
+        new AllColumnIncluderator(),
+        false,
+        queryContext,
+        EnumSet.noneOf(SegmentMetadataQuery.AnalysisType.class),
+        false,
+        false
+    );
+
+    QueryLifecycleFactory factoryMock = 
EasyMock.createMock(QueryLifecycleFactory.class);
+    QueryLifecycle lifecycleMock = EasyMock.createMock(QueryLifecycle.class);
+
+    // Need to create schema for this test because the available schemas don't 
mock the QueryLifecycleFactory, which I need for this test.
+    DruidSchema mySchema = new DruidSchema(
+        factoryMock,
+        serverView,
+        segmentManager,
+        new MapJoinableFactory(
+            ImmutableSet.of(globalTableJoinable),
+            ImmutableMap.of(globalTableJoinable.getClass(), 
GlobalTableDataSource.class)
+        ),
+        PLANNER_CONFIG_DEFAULT,
+        new NoopEscalator(),
+        brokerInternalQueryConfig
+    );
+
+    EasyMock.expect(factoryMock.factorize()).andReturn(lifecycleMock).once();
+    // This is the mat of the test, making sure that the query created by the 
method under test matches the expected query, specifically the operator 
configured context
+    EasyMock.expect(lifecycleMock.runSimple(expectedMetadataQuery, 
AllowAllAuthenticator.ALLOW_ALL_RESULT, Access.OK)).andReturn(null);
+
+    EasyMock.replay(factoryMock, lifecycleMock);
+
+    mySchema.runSegmentMetadataQuery(segmentIterable);
+
+    EasyMock.verify(factoryMock, lifecycleMock);
+
+  }
+
   private static DataSegment newSegment(String datasource, int partitionId)
   {
     return new DataSegment(
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
index ed6599a..d25a902 100644
--- 
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
+++ 
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
@@ -34,6 +34,7 @@ import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.Table;
 import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.druid.client.BrokerInternalQueryConfig;
 import org.apache.druid.client.DruidServer;
 import org.apache.druid.client.ImmutableDruidDataSource;
 import org.apache.druid.client.ImmutableDruidServer;
@@ -255,7 +256,8 @@ public class SystemSchemaTest extends CalciteTestBase
         new SegmentManager(EasyMock.createMock(SegmentLoader.class)),
         new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
         PLANNER_CONFIG_DEFAULT,
-        new NoopEscalator()
+        new NoopEscalator(),
+        new BrokerInternalQueryConfig()
     );
     druidSchema.start();
     druidSchema.awaitInitialization();
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
index ca70e1d..79fc673 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
@@ -31,6 +31,7 @@ import com.google.inject.Injector;
 import com.google.inject.Key;
 import org.apache.calcite.jdbc.CalciteSchema;
 import org.apache.calcite.schema.SchemaPlus;
+import org.apache.druid.client.BrokerInternalQueryConfig;
 import org.apache.druid.client.BrokerSegmentWatcherConfig;
 import org.apache.druid.client.DruidServer;
 import org.apache.druid.client.ServerInventoryView;
@@ -1243,7 +1244,8 @@ public class CalciteTests
         },
         createDefaultJoinableFactory(),
         plannerConfig,
-        TEST_AUTHENTICATOR_ESCALATOR
+        TEST_AUTHENTICATOR_ESCALATOR,
+        new BrokerInternalQueryConfig()
     );
 
     try {

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to