This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 775d654a6c8 Load only the required lookups for MSQ tasks (#16358)
775d654a6c8 is described below
commit 775d654a6c84fc6afbb1935010f11fed1bc35d18
Author: Akshat Jain <[email protected]>
AuthorDate: Thu May 9 11:21:54 2024 +0530
Load only the required lookups for MSQ tasks (#16358)
With this PR changes, MSQ tasks (MSQControllerTask and MSQWorkerTask) only
load the required lookups during querying and ingestion, based on the value of
CTX_LOOKUPS_TO_LOAD key in the query context.
---
.../msq/indexing/IndexerControllerContext.java | 15 +++++
.../druid/msq/indexing/MSQControllerTask.java | 7 +++
.../apache/druid/msq/indexing/MSQWorkerTask.java | 27 +++++++++
.../apache/druid/msq/sql/MSQTaskQueryMaker.java | 9 ++-
.../org/apache/druid/msq/exec/MSQSelectTest.java | 8 ++-
.../druid/msq/indexing/MSQControllerTaskTest.java | 53 ++++++++++++++++++
.../druid/msq/indexing/MSQWorkerTaskTest.java | 65 +++++++++++++++++++++-
.../org/apache/druid/msq/test/MSQTestBase.java | 30 +++++++++-
.../builtin/QueryLookupOperatorConversion.java | 6 +-
.../druid/sql/calcite/planner/PlannerContext.java | 21 +++++++
.../planner/SqlResourceCollectorShuttle.java | 9 ++-
11 files changed, 244 insertions(+), 6 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
index 3ff71c3e1b7..e8fba09ddc5 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
@@ -55,6 +55,7 @@ import org.apache.druid.rpc.ServiceClientFactory;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.segment.realtime.firehose.ChatHandler;
import org.apache.druid.server.DruidNode;
+import org.apache.druid.sql.calcite.planner.PlannerContext;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
@@ -269,6 +270,20 @@ public class IndexerControllerContext implements
ControllerContext
.put(MultiStageQueryContext.CTX_IS_REINDEX,
MSQControllerTask.isReplaceInputDataSourceTask(querySpec))
.put(MultiStageQueryContext.CTX_MAX_CONCURRENT_STAGES,
queryKernelConfig.getMaxConcurrentStages());
+ // Put the lookup loading info in the task context to facilitate selective
loading of lookups.
+ if (controllerTaskContext.get(PlannerContext.CTX_LOOKUP_LOADING_MODE) !=
null) {
+ taskContextOverridesBuilder.put(
+ PlannerContext.CTX_LOOKUP_LOADING_MODE,
+ controllerTaskContext.get(PlannerContext.CTX_LOOKUP_LOADING_MODE)
+ );
+ }
+ if (controllerTaskContext.get(PlannerContext.CTX_LOOKUPS_TO_LOAD) != null)
{
+ taskContextOverridesBuilder.put(
+ PlannerContext.CTX_LOOKUPS_TO_LOAD,
+ controllerTaskContext.get(PlannerContext.CTX_LOOKUPS_TO_LOAD)
+ );
+ }
+
if (querySpec.getDestination().toSelectDestination() != null) {
taskContextOverridesBuilder.put(
MultiStageQueryContext.CTX_SELECT_DESTINATION,
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
index 2a435215144..bdaf3964b29 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
@@ -58,6 +58,7 @@ import org.apache.druid.rpc.ServiceClientFactory;
import org.apache.druid.rpc.StandardRetryPolicy;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.sql.calcite.run.SqlResults;
@@ -333,4 +334,10 @@ public class MSQControllerTask extends AbstractTask
implements ClientTaskQuery,
{
return querySpec.getDestination() instanceof DurableStorageMSQDestination;
}
+
+ @Override
+ public LookupLoadingSpec getLookupLoadingSpec()
+ {
+ return LookupLoadingSpec.NONE;
+ }
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java
index b4d18ea390e..a23c62881a0 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java
@@ -27,6 +27,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Injector;
+import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
@@ -37,9 +38,13 @@ import org.apache.druid.msq.exec.MSQTasks;
import org.apache.druid.msq.exec.Worker;
import org.apache.druid.msq.exec.WorkerContext;
import org.apache.druid.msq.exec.WorkerImpl;
+import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.server.security.ResourceAction;
+import org.apache.druid.sql.calcite.planner.PlannerContext;
import javax.annotation.Nonnull;
+import java.util.Collection;
+import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
@@ -185,4 +190,26 @@ public class MSQWorkerTask extends AbstractTask
{
return Objects.hash(super.hashCode(), controllerTaskId, workerNumber,
retryCount, worker);
}
+
+ @Override
+ public LookupLoadingSpec getLookupLoadingSpec()
+ {
+ final Object lookupModeValue =
getContext().get(PlannerContext.CTX_LOOKUP_LOADING_MODE);
+ if (lookupModeValue == null) {
+ return LookupLoadingSpec.ALL;
+ }
+
+ final LookupLoadingSpec.Mode lookupLoadingMode =
LookupLoadingSpec.Mode.valueOf(lookupModeValue.toString());
+ if (lookupLoadingMode == LookupLoadingSpec.Mode.NONE) {
+ return LookupLoadingSpec.NONE;
+ } else if (lookupLoadingMode == LookupLoadingSpec.Mode.ONLY_REQUIRED) {
+ Collection<String> lookupsToLoad = (Collection<String>)
getContext().get(PlannerContext.CTX_LOOKUPS_TO_LOAD);
+ if (lookupsToLoad == null || lookupsToLoad.isEmpty()) {
+ throw InvalidInput.exception("Set of lookups to load cannot be %s for
mode[ONLY_REQUIRED].", lookupsToLoad);
+ }
+ return LookupLoadingSpec.loadOnly(new HashSet<>(lookupsToLoad));
+ } else {
+ return LookupLoadingSpec.ALL;
+ }
+ }
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
index 8cc34547f99..533010c3057 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
@@ -52,6 +52,7 @@ import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.server.QueryResponse;
+import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.sql.calcite.parser.DruidSqlIngest;
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
import org.apache.druid.sql.calcite.parser.DruidSqlReplace;
@@ -283,6 +284,12 @@ public class MSQTaskQueryMaker implements QueryMaker
MSQTaskQueryMakerUtils.validateRealtimeReindex(querySpec);
+ final Map<String, Object> context = new HashMap<>();
+ context.put(PlannerContext.CTX_LOOKUP_LOADING_MODE,
plannerContext.getLookupLoadingSpec().getMode());
+ if (plannerContext.getLookupLoadingSpec().getMode() ==
LookupLoadingSpec.Mode.ONLY_REQUIRED) {
+ context.put(PlannerContext.CTX_LOOKUPS_TO_LOAD,
plannerContext.getLookupLoadingSpec().getLookupsToLoad());
+ }
+
final MSQControllerTask controllerTask = new MSQControllerTask(
taskId,
querySpec.withOverriddenContext(nativeQueryContext),
@@ -291,7 +298,7 @@ public class MSQTaskQueryMaker implements QueryMaker
SqlResults.Context.fromPlannerContext(plannerContext),
sqlTypeNames,
columnTypeList,
- null
+ context
);
FutureUtils.getUnchecked(overlordClient.runTask(taskId, controllerTask),
true);
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
index 56f1ce98696..84dddd526c1 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.msq.exec;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.JsonInputFormat;
@@ -70,6 +71,7 @@ import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.join.JoinType;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
+import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.external.ExternalDataSource;
import org.apache.druid.sql.calcite.filtration.Filtration;
@@ -227,7 +229,9 @@ public class MSQSelectTest extends MSQTestBase
new Object[]{1L, "1"},
new Object[]{1L, "def"},
new Object[]{1L, "abc"}
- )).verifyResults();
+ ))
+ .setExpectedLookupLoadingSpec(LookupLoadingSpec.NONE)
+ .verifyResults();
}
@MethodSource("data")
@@ -742,6 +746,7 @@ public class MSQSelectTest extends MSQTestBase
.build())
.setExpectedRowSignature(rowSignature)
.setExpectedResultRows(ImmutableList.of(new Object[]{4L}))
+
.setExpectedLookupLoadingSpec(LookupLoadingSpec.loadOnly(ImmutableSet.of("lookyloo")))
.verifyResults();
}
@@ -808,6 +813,7 @@ public class MSQSelectTest extends MSQTestBase
new Object[]{"xabc", 1L}
)
)
+
.setExpectedLookupLoadingSpec(LookupLoadingSpec.loadOnly(ImmutableSet.of("lookyloo")))
.verifyResults();
}
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java
index e0eee251f72..a5001fb58eb 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java
@@ -20,6 +20,7 @@
package org.apache.druid.msq.indexing;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
@@ -34,12 +35,15 @@ import
org.apache.druid.msq.indexing.destination.DataSourceMSQDestination;
import org.apache.druid.query.Druids;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
+import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.sql.calcite.planner.ColumnMapping;
import org.apache.druid.sql.calcite.planner.ColumnMappings;
+import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -84,6 +88,55 @@ public class MSQControllerTaskTest
Assert.assertTrue(controllerTask.getInputSourceResources().isEmpty());
}
+ @Test
+ public void testGetDefaultLookupLoadingSpec()
+ {
+ MSQControllerTask controllerTask = new MSQControllerTask(
+ null,
+ MSQ_SPEC,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null
+ );
+ Assert.assertEquals(LookupLoadingSpec.NONE,
controllerTask.getLookupLoadingSpec());
+ }
+
+ @Test
+ public void testGetLookupLoadingSpecUsingLookupLoadingInfoInContext()
+ {
+ MSQSpec build = MSQSpec
+ .builder()
+ .query(new Druids.ScanQueryBuilder()
+ .intervals(new MultipleIntervalSegmentSpec(INTERVALS))
+ .dataSource("target")
+ .context(
+ ImmutableMap.of(
+ PlannerContext.CTX_LOOKUPS_TO_LOAD,
Arrays.asList("lookupName1", "lookupName2"),
+ PlannerContext.CTX_LOOKUP_LOADING_MODE,
LookupLoadingSpec.Mode.ONLY_REQUIRED)
+ )
+ .build()
+ )
+ .columnMappings(new ColumnMappings(Collections.emptyList()))
+ .tuningConfig(MSQTuningConfig.defaultConfig())
+ .build();
+ MSQControllerTask controllerTask = new MSQControllerTask(
+ null,
+ build,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null
+ );
+
+ // Va;idate that MSQ Controller task doesn't load any lookups even if
context has lookup info populated.
+ Assert.assertEquals(LookupLoadingSpec.NONE,
controllerTask.getLookupLoadingSpec());
+ }
+
@Test
public void testGetTaskAllocatorId()
{
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java
index 6eff77184ea..5e79b129f3b 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java
@@ -20,9 +20,16 @@
package org.apache.druid.msq.indexing;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
+import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.junit.Assert;
import org.junit.Test;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
@@ -47,7 +54,6 @@ public class MSQWorkerTaskTest
@Test
public void testEquals()
{
- Assert.assertEquals(msqWorkerTask, msqWorkerTask);
Assert.assertEquals(
msqWorkerTask,
new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context,
retryCount)
@@ -108,4 +114,61 @@ public class MSQWorkerTaskTest
MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId,
dataSource, workerNumber, context, retryCount);
Assert.assertTrue(msqWorkerTask.getInputSourceResources().isEmpty());
}
+
+ @Test
+ public void testGetDefaultLookupLoadingSpec()
+ {
+ MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId,
dataSource, workerNumber, context, retryCount);
+ Assert.assertEquals(LookupLoadingSpec.ALL,
msqWorkerTask.getLookupLoadingSpec());
+ }
+
+ @Test
+ public void testGetLookupLoadingWithModeNoneInContext()
+ {
+ final ImmutableMap<String, Object> context =
ImmutableMap.of(PlannerContext.CTX_LOOKUP_LOADING_MODE,
LookupLoadingSpec.Mode.NONE);
+ MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId,
dataSource, workerNumber, context, retryCount);
+ Assert.assertEquals(LookupLoadingSpec.NONE,
msqWorkerTask.getLookupLoadingSpec());
+ }
+
+ @Test
+ public void testGetLookupLoadingSpecWithLookupListInContext()
+ {
+ final ImmutableMap<String, Object> context = ImmutableMap.of(
+ PlannerContext.CTX_LOOKUPS_TO_LOAD, Arrays.asList("lookupName1",
"lookupName2"),
+ PlannerContext.CTX_LOOKUP_LOADING_MODE,
LookupLoadingSpec.Mode.ONLY_REQUIRED);
+ MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId,
dataSource, workerNumber, context, retryCount);
+ Assert.assertEquals(LookupLoadingSpec.Mode.ONLY_REQUIRED,
msqWorkerTask.getLookupLoadingSpec().getMode());
+ Assert.assertEquals(ImmutableSet.of("lookupName1", "lookupName2"),
msqWorkerTask.getLookupLoadingSpec().getLookupsToLoad());
+ }
+
+ @Test
+ public void testGetLookupLoadingSpecWithInvalidInput()
+ {
+ final HashMap<String, Object> context = new HashMap<>();
+ context.put(PlannerContext.CTX_LOOKUP_LOADING_MODE,
LookupLoadingSpec.Mode.ONLY_REQUIRED);
+
+ // Setting CTX_LOOKUPS_TO_LOAD as null
+ context.put(PlannerContext.CTX_LOOKUPS_TO_LOAD, null);
+
+ MSQWorkerTask taskWithNullLookups = new MSQWorkerTask(controllerTaskId,
dataSource, workerNumber, context, retryCount);
+ DruidException exception = Assert.assertThrows(
+ DruidException.class,
+ taskWithNullLookups::getLookupLoadingSpec
+ );
+ Assert.assertEquals(
+ "Set of lookups to load cannot be null for mode[ONLY_REQUIRED].",
+ exception.getMessage());
+
+ // Setting CTX_LOOKUPS_TO_LOAD as empty list
+ context.put(PlannerContext.CTX_LOOKUPS_TO_LOAD, Collections.emptyList());
+
+ MSQWorkerTask taskWithEmptyLookups = new MSQWorkerTask(controllerTaskId,
dataSource, workerNumber, context, retryCount);
+ exception = Assert.assertThrows(
+ DruidException.class,
+ taskWithEmptyLookups::getLookupLoadingSpec
+ );
+ Assert.assertEquals(
+ "Set of lookups to load cannot be [] for mode[ONLY_REQUIRED].",
+ exception.getMessage());
+ }
}
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
index fe78b481bee..cdaafc75c60 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
@@ -153,6 +153,7 @@ import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.SpecificSegmentsQuerySegmentWalker;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.apache.druid.server.coordination.NoopDataSegmentAnnouncer;
+import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.sql.DirectStatement;
@@ -853,6 +854,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
protected CompactionState expectedLastCompactionState = null;
protected Set<Interval> expectedTombstoneIntervals = null;
protected List<Object[]> expectedResultRows = null;
+ protected LookupLoadingSpec expectedLookupLoadingSpec = null;
protected Matcher<Throwable> expectedValidationErrorMatcher = null;
protected List<Pair<Predicate<MSQTaskReportPayload>, String>>
adhocReportAssertionAndReasons = new ArrayList<>();
protected Matcher<Throwable> expectedExecutionErrorMatcher = null;
@@ -917,6 +919,12 @@ public class MSQTestBase extends BaseCalciteQueryTest
return asBuilder();
}
+ public Builder setExpectedLookupLoadingSpec(LookupLoadingSpec
lookupLoadingSpec)
+ {
+ this.expectedLookupLoadingSpec = lookupLoadingSpec;
+ return asBuilder();
+ }
+
public Builder setExpectedMSQSpec(MSQSpec expectedMSQSpec)
{
this.expectedMSQSpec = expectedMSQSpec;
@@ -1010,6 +1018,23 @@ public class MSQTestBase extends BaseCalciteQueryTest
assertThat(e, expectedValidationErrorMatcher);
}
+ protected void verifyLookupLoadingInfoInTaskContext(Map<String, Object>
context)
+ {
+ String lookupLoadingMode =
context.get(PlannerContext.CTX_LOOKUP_LOADING_MODE).toString();
+ List<String> lookupsToLoad = (List<String>)
context.get(PlannerContext.CTX_LOOKUPS_TO_LOAD);
+ if (expectedLookupLoadingSpec != null) {
+ Assert.assertEquals(expectedLookupLoadingSpec.getMode().toString(),
lookupLoadingMode);
+ if
(expectedLookupLoadingSpec.getMode().equals(LookupLoadingSpec.Mode.ONLY_REQUIRED))
{
+ Assert.assertEquals(new
ArrayList<>(expectedLookupLoadingSpec.getLookupsToLoad()), lookupsToLoad);
+ } else {
+ Assert.assertNull(lookupsToLoad);
+ }
+ } else {
+ Assert.assertEquals(LookupLoadingSpec.Mode.NONE.toString(),
lookupLoadingMode);
+ Assert.assertNull(lookupsToLoad);
+ }
+ }
+
protected void verifyWorkerCount(CounterSnapshotsTree counterSnapshotsTree)
{
Map<Integer, Map<Integer, CounterSnapshots>> counterMap =
counterSnapshotsTree.copyMap();
@@ -1165,7 +1190,9 @@ public class MSQTestBase extends BaseCalciteQueryTest
verifyWorkerCount(reportPayload.getCounters());
verifyCounters(reportPayload.getCounters());
- MSQSpec foundSpec =
indexingServiceClient.getMSQControllerTask(controllerId).getQuerySpec();
+ MSQControllerTask msqControllerTask =
indexingServiceClient.getMSQControllerTask(controllerId);
+ MSQSpec foundSpec = msqControllerTask.getQuerySpec();
+ verifyLookupLoadingInfoInTaskContext(msqControllerTask.getContext());
log.info(
"found generated segments: %s",
segmentManager.getAllDataSegments().stream().map(s ->
s.toString()).collect(
@@ -1393,6 +1420,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
throw new ISE("Query %s failed due to %s", sql,
payload.getStatus().getErrorReport().toString());
} else {
MSQControllerTask msqControllerTask =
indexingServiceClient.getMSQControllerTask(controllerId);
+ verifyLookupLoadingInfoInTaskContext(msqControllerTask.getContext());
final MSQSpec spec = msqControllerTask.getQuerySpec();
final List<Object[]> rows;
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/QueryLookupOperatorConversion.java
b/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/QueryLookupOperatorConversion.java
index 9075456c812..8947bd60a01 100644
---
a/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/QueryLookupOperatorConversion.java
+++
b/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/QueryLookupOperatorConversion.java
@@ -81,12 +81,16 @@ public class QueryLookupOperatorConversion implements
SqlOperatorConversion
final DruidExpression arg = inputExpressions.get(0);
final Expr lookupNameExpr =
plannerContext.parseExpression(inputExpressions.get(1).getExpression());
final String replaceMissingValueWith =
getReplaceMissingValueWith(inputExpressions, plannerContext);
+ final String lookupName = (String) lookupNameExpr.getLiteralValue();
+
+ // Add the lookup name to the set of lookups to selectively load.
+ plannerContext.addLookupToLoad(lookupName);
if (arg.isSimpleExtraction() && lookupNameExpr.isLiteral()) {
return arg.getSimpleExtraction().cascade(
new RegisteredLookupExtractionFn(
lookupExtractorFactoryContainerProvider,
- (String) lookupNameExpr.getLiteralValue(),
+ lookupName,
false,
replaceMissingValueWith,
null,
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java
index 281fc66c8aa..99f721bffaa 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java
@@ -43,6 +43,7 @@ import org.apache.druid.query.lookup.LookupExtractor;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
import org.apache.druid.query.lookup.RegisteredLookupExtractionFn;
import org.apache.druid.segment.join.JoinableFactoryWrapper;
+import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.server.security.ResourceAction;
@@ -61,6 +62,7 @@ import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -78,6 +80,8 @@ public class PlannerContext
public static final String CTX_SQL_CURRENT_TIMESTAMP = "sqlCurrentTimestamp";
public static final String CTX_SQL_TIME_ZONE = "sqlTimeZone";
public static final String CTX_SQL_JOIN_ALGORITHM = "sqlJoinAlgorithm";
+ public static final String CTX_LOOKUP_LOADING_MODE = "lookupLoadingMode";
+ public static final String CTX_LOOKUPS_TO_LOAD = "lookupsToLoad";
private static final JoinAlgorithm DEFAULT_SQL_JOIN_ALGORITHM =
JoinAlgorithm.BROADCAST;
/**
@@ -142,6 +146,7 @@ public class PlannerContext
// set of attributes for a SQL statement used in the EXPLAIN PLAN output
private ExplainAttributes explainAttributes;
private PlannerLookupCache lookupCache;
+ private final Set<String> lookupsToLoad = new HashSet<>();
private PlannerContext(
final PlannerToolbox plannerToolbox,
@@ -343,6 +348,22 @@ public class PlannerContext
return plannerToolbox.rootSchema().getResourceType(schema, resourceName);
}
+ /**
+ * Adds the given lookup name to the lookup loading spec.
+ */
+ public void addLookupToLoad(String lookupName)
+ {
+ lookupsToLoad.add(lookupName);
+ }
+
+ /**
+ * Returns the lookup to load for a given task.
+ */
+ public LookupLoadingSpec getLookupLoadingSpec()
+ {
+ return lookupsToLoad.isEmpty() ? LookupLoadingSpec.NONE :
LookupLoadingSpec.loadOnly(lookupsToLoad);
+ }
+
/**
* Return the query context as a mutable map. Use this form when
* modifying the context during planning.
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlResourceCollectorShuttle.java
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlResourceCollectorShuttle.java
index 631936972e1..4300c7d574b 100644
---
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlResourceCollectorShuttle.java
+++
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlResourceCollectorShuttle.java
@@ -33,6 +33,7 @@ import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import org.apache.druid.sql.calcite.expression.AuthorizableOperator;
+import org.apache.druid.sql.calcite.schema.NamedLookupSchema;
import java.util.HashSet;
import java.util.List;
@@ -87,13 +88,19 @@ public class SqlResourceCollectorShuttle extends SqlShuttle
if (qualifiedNameParts.size() == 2) {
final String schema = qualifiedNameParts.get(0);
final String resourceName = qualifiedNameParts.get(1);
+
+ // Add the lookup name to the set of lookups to selectively load.
+ if (schema.equals(NamedLookupSchema.NAME)) {
+ plannerContext.addLookupToLoad(resourceName);
+ }
+
final String resourceType =
plannerContext.getSchemaResourceType(schema, resourceName);
if (resourceType != null) {
resourceActions.add(new ResourceAction(new Resource(resourceName,
resourceType), Action.READ));
}
} else if (qualifiedNameParts.size() > 2) {
// Don't expect to see more than 2 names (catalog?).
- throw new ISE("Cannot analyze table idetifier %s",
qualifiedNameParts);
+ throw new ISE("Cannot analyze table identifier %s",
qualifiedNameParts);
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]