This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 775d654a6c8 Load only the required lookups for MSQ tasks (#16358)
775d654a6c8 is described below

commit 775d654a6c84fc6afbb1935010f11fed1bc35d18
Author: Akshat Jain <[email protected]>
AuthorDate: Thu May 9 11:21:54 2024 +0530

    Load only the required lookups for MSQ tasks (#16358)
    
    With this PR changes, MSQ tasks (MSQControllerTask and MSQWorkerTask) only 
load the required lookups during querying and ingestion, based on the value of 
CTX_LOOKUPS_TO_LOAD key in the query context.
---
 .../msq/indexing/IndexerControllerContext.java     | 15 +++++
 .../druid/msq/indexing/MSQControllerTask.java      |  7 +++
 .../apache/druid/msq/indexing/MSQWorkerTask.java   | 27 +++++++++
 .../apache/druid/msq/sql/MSQTaskQueryMaker.java    |  9 ++-
 .../org/apache/druid/msq/exec/MSQSelectTest.java   |  8 ++-
 .../druid/msq/indexing/MSQControllerTaskTest.java  | 53 ++++++++++++++++++
 .../druid/msq/indexing/MSQWorkerTaskTest.java      | 65 +++++++++++++++++++++-
 .../org/apache/druid/msq/test/MSQTestBase.java     | 30 +++++++++-
 .../builtin/QueryLookupOperatorConversion.java     |  6 +-
 .../druid/sql/calcite/planner/PlannerContext.java  | 21 +++++++
 .../planner/SqlResourceCollectorShuttle.java       |  9 ++-
 11 files changed, 244 insertions(+), 6 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
index 3ff71c3e1b7..e8fba09ddc5 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
@@ -55,6 +55,7 @@ import org.apache.druid.rpc.ServiceClientFactory;
 import org.apache.druid.rpc.indexing.OverlordClient;
 import org.apache.druid.segment.realtime.firehose.ChatHandler;
 import org.apache.druid.server.DruidNode;
+import org.apache.druid.sql.calcite.planner.PlannerContext;
 
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
@@ -269,6 +270,20 @@ public class IndexerControllerContext implements 
ControllerContext
         .put(MultiStageQueryContext.CTX_IS_REINDEX, 
MSQControllerTask.isReplaceInputDataSourceTask(querySpec))
         .put(MultiStageQueryContext.CTX_MAX_CONCURRENT_STAGES, 
queryKernelConfig.getMaxConcurrentStages());
 
+    // Put the lookup loading info in the task context to facilitate selective 
loading of lookups.
+    if (controllerTaskContext.get(PlannerContext.CTX_LOOKUP_LOADING_MODE) != 
null) {
+      taskContextOverridesBuilder.put(
+          PlannerContext.CTX_LOOKUP_LOADING_MODE,
+          controllerTaskContext.get(PlannerContext.CTX_LOOKUP_LOADING_MODE)
+      );
+    }
+    if (controllerTaskContext.get(PlannerContext.CTX_LOOKUPS_TO_LOAD) != null) 
{
+      taskContextOverridesBuilder.put(
+          PlannerContext.CTX_LOOKUPS_TO_LOAD,
+          controllerTaskContext.get(PlannerContext.CTX_LOOKUPS_TO_LOAD)
+      );
+    }
+
     if (querySpec.getDestination().toSelectDestination() != null) {
       taskContextOverridesBuilder.put(
           MultiStageQueryContext.CTX_SELECT_DESTINATION,
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
index 2a435215144..bdaf3964b29 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
@@ -58,6 +58,7 @@ import org.apache.druid.rpc.ServiceClientFactory;
 import org.apache.druid.rpc.StandardRetryPolicy;
 import org.apache.druid.rpc.indexing.OverlordClient;
 import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
 import org.apache.druid.server.security.Resource;
 import org.apache.druid.server.security.ResourceAction;
 import org.apache.druid.sql.calcite.run.SqlResults;
@@ -333,4 +334,10 @@ public class MSQControllerTask extends AbstractTask 
implements ClientTaskQuery,
   {
     return querySpec.getDestination() instanceof DurableStorageMSQDestination;
   }
+
+  @Override
+  public LookupLoadingSpec getLookupLoadingSpec()
+  {
+    return LookupLoadingSpec.NONE;
+  }
 }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java
index b4d18ea390e..a23c62881a0 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java
@@ -27,6 +27,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableSet;
 import com.google.inject.Injector;
+import org.apache.druid.error.InvalidInput;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
@@ -37,9 +38,13 @@ import org.apache.druid.msq.exec.MSQTasks;
 import org.apache.druid.msq.exec.Worker;
 import org.apache.druid.msq.exec.WorkerContext;
 import org.apache.druid.msq.exec.WorkerImpl;
+import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
 import org.apache.druid.server.security.ResourceAction;
+import org.apache.druid.sql.calcite.planner.PlannerContext;
 
 import javax.annotation.Nonnull;
+import java.util.Collection;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
@@ -185,4 +190,26 @@ public class MSQWorkerTask extends AbstractTask
   {
     return Objects.hash(super.hashCode(), controllerTaskId, workerNumber, 
retryCount, worker);
   }
+
+  @Override
+  public LookupLoadingSpec getLookupLoadingSpec()
+  {
+    final Object lookupModeValue = 
getContext().get(PlannerContext.CTX_LOOKUP_LOADING_MODE);
+    if (lookupModeValue == null) {
+      return LookupLoadingSpec.ALL;
+    }
+
+    final LookupLoadingSpec.Mode lookupLoadingMode = 
LookupLoadingSpec.Mode.valueOf(lookupModeValue.toString());
+    if (lookupLoadingMode == LookupLoadingSpec.Mode.NONE) {
+      return LookupLoadingSpec.NONE;
+    } else if (lookupLoadingMode == LookupLoadingSpec.Mode.ONLY_REQUIRED) {
+      Collection<String> lookupsToLoad = (Collection<String>) 
getContext().get(PlannerContext.CTX_LOOKUPS_TO_LOAD);
+      if (lookupsToLoad == null || lookupsToLoad.isEmpty()) {
+        throw InvalidInput.exception("Set of lookups to load cannot be %s for 
mode[ONLY_REQUIRED].", lookupsToLoad);
+      }
+      return LookupLoadingSpec.loadOnly(new HashSet<>(lookupsToLoad));
+    } else {
+      return LookupLoadingSpec.ALL;
+    }
+  }
 }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
index 8cc34547f99..533010c3057 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
@@ -52,6 +52,7 @@ import org.apache.druid.rpc.indexing.OverlordClient;
 import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.server.QueryResponse;
+import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
 import org.apache.druid.sql.calcite.parser.DruidSqlIngest;
 import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
 import org.apache.druid.sql.calcite.parser.DruidSqlReplace;
@@ -283,6 +284,12 @@ public class MSQTaskQueryMaker implements QueryMaker
 
     MSQTaskQueryMakerUtils.validateRealtimeReindex(querySpec);
 
+    final Map<String, Object> context = new HashMap<>();
+    context.put(PlannerContext.CTX_LOOKUP_LOADING_MODE, 
plannerContext.getLookupLoadingSpec().getMode());
+    if (plannerContext.getLookupLoadingSpec().getMode() == 
LookupLoadingSpec.Mode.ONLY_REQUIRED) {
+      context.put(PlannerContext.CTX_LOOKUPS_TO_LOAD, 
plannerContext.getLookupLoadingSpec().getLookupsToLoad());
+    }
+
     final MSQControllerTask controllerTask = new MSQControllerTask(
         taskId,
         querySpec.withOverriddenContext(nativeQueryContext),
@@ -291,7 +298,7 @@ public class MSQTaskQueryMaker implements QueryMaker
         SqlResults.Context.fromPlannerContext(plannerContext),
         sqlTypeNames,
         columnTypeList,
-        null
+        context
     );
 
     FutureUtils.getUnchecked(overlordClient.runTask(taskId, controllerTask), 
true);
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
index 56f1ce98696..84dddd526c1 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.msq.exec;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.data.input.impl.CsvInputFormat;
 import org.apache.druid.data.input.impl.JsonInputFormat;
@@ -70,6 +71,7 @@ import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.segment.join.JoinType;
 import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
+import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
 import org.apache.druid.sql.calcite.expression.DruidExpression;
 import org.apache.druid.sql.calcite.external.ExternalDataSource;
 import org.apache.druid.sql.calcite.filtration.Filtration;
@@ -227,7 +229,9 @@ public class MSQSelectTest extends MSQTestBase
             new Object[]{1L, "1"},
             new Object[]{1L, "def"},
             new Object[]{1L, "abc"}
-        )).verifyResults();
+        ))
+        .setExpectedLookupLoadingSpec(LookupLoadingSpec.NONE)
+        .verifyResults();
   }
 
   @MethodSource("data")
@@ -742,6 +746,7 @@ public class MSQSelectTest extends MSQTestBase
                    .build())
         .setExpectedRowSignature(rowSignature)
         .setExpectedResultRows(ImmutableList.of(new Object[]{4L}))
+        
.setExpectedLookupLoadingSpec(LookupLoadingSpec.loadOnly(ImmutableSet.of("lookyloo")))
         .verifyResults();
   }
 
@@ -808,6 +813,7 @@ public class MSQSelectTest extends MSQTestBase
                 new Object[]{"xabc", 1L}
             )
         )
+        
.setExpectedLookupLoadingSpec(LookupLoadingSpec.loadOnly(ImmutableSet.of("lookyloo")))
         .verifyResults();
   }
 
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java
index e0eee251f72..a5001fb58eb 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java
@@ -20,6 +20,7 @@
 package org.apache.druid.msq.indexing;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import org.apache.druid.error.DruidException;
 import org.apache.druid.indexing.common.TaskLock;
 import org.apache.druid.indexing.common.TaskLockType;
@@ -34,12 +35,15 @@ import 
org.apache.druid.msq.indexing.destination.DataSourceMSQDestination;
 import org.apache.druid.query.Druids;
 import org.apache.druid.query.scan.ScanQuery;
 import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
+import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
 import org.apache.druid.sql.calcite.planner.ColumnMapping;
 import org.apache.druid.sql.calcite.planner.ColumnMappings;
+import org.apache.druid.sql.calcite.planner.PlannerContext;
 import org.joda.time.Interval;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
@@ -84,6 +88,55 @@ public class MSQControllerTaskTest
     Assert.assertTrue(controllerTask.getInputSourceResources().isEmpty());
   }
 
+  @Test
+  public void testGetDefaultLookupLoadingSpec()
+  {
+    MSQControllerTask controllerTask = new MSQControllerTask(
+        null,
+        MSQ_SPEC,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null
+    );
+    Assert.assertEquals(LookupLoadingSpec.NONE, 
controllerTask.getLookupLoadingSpec());
+  }
+
+  @Test
+  public void testGetLookupLoadingSpecUsingLookupLoadingInfoInContext()
+  {
+    MSQSpec build = MSQSpec
+        .builder()
+        .query(new Druids.ScanQueryBuilder()
+                   .intervals(new MultipleIntervalSegmentSpec(INTERVALS))
+                   .dataSource("target")
+                   .context(
+                       ImmutableMap.of(
+                           PlannerContext.CTX_LOOKUPS_TO_LOAD, 
Arrays.asList("lookupName1", "lookupName2"),
+                           PlannerContext.CTX_LOOKUP_LOADING_MODE, 
LookupLoadingSpec.Mode.ONLY_REQUIRED)
+                   )
+                   .build()
+        )
+        .columnMappings(new ColumnMappings(Collections.emptyList()))
+        .tuningConfig(MSQTuningConfig.defaultConfig())
+        .build();
+    MSQControllerTask controllerTask = new MSQControllerTask(
+        null,
+        build,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null
+    );
+
+    // Va;idate that MSQ Controller task doesn't load any lookups even if 
context has lookup info populated.
+    Assert.assertEquals(LookupLoadingSpec.NONE, 
controllerTask.getLookupLoadingSpec());
+  }
+
   @Test
   public void testGetTaskAllocatorId()
   {
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java
index 6eff77184ea..5e79b129f3b 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java
@@ -20,9 +20,16 @@
 package org.apache.druid.msq.indexing;
 
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
+import org.apache.druid.sql.calcite.planner.PlannerContext;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Set;
 
@@ -47,7 +54,6 @@ public class MSQWorkerTaskTest
   @Test
   public void testEquals()
   {
-    Assert.assertEquals(msqWorkerTask, msqWorkerTask);
     Assert.assertEquals(
         msqWorkerTask,
         new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, 
retryCount)
@@ -108,4 +114,61 @@ public class MSQWorkerTaskTest
     MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId, 
dataSource, workerNumber, context, retryCount);
     Assert.assertTrue(msqWorkerTask.getInputSourceResources().isEmpty());
   }
+
+  @Test
+  public void testGetDefaultLookupLoadingSpec()
+  {
+    MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId, 
dataSource, workerNumber, context, retryCount);
+    Assert.assertEquals(LookupLoadingSpec.ALL, 
msqWorkerTask.getLookupLoadingSpec());
+  }
+
+  @Test
+  public void testGetLookupLoadingWithModeNoneInContext()
+  {
+    final ImmutableMap<String, Object> context = 
ImmutableMap.of(PlannerContext.CTX_LOOKUP_LOADING_MODE, 
LookupLoadingSpec.Mode.NONE);
+    MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId, 
dataSource, workerNumber, context, retryCount);
+    Assert.assertEquals(LookupLoadingSpec.NONE, 
msqWorkerTask.getLookupLoadingSpec());
+  }
+
+  @Test
+  public void testGetLookupLoadingSpecWithLookupListInContext()
+  {
+    final ImmutableMap<String, Object> context = ImmutableMap.of(
+        PlannerContext.CTX_LOOKUPS_TO_LOAD, Arrays.asList("lookupName1", 
"lookupName2"),
+        PlannerContext.CTX_LOOKUP_LOADING_MODE, 
LookupLoadingSpec.Mode.ONLY_REQUIRED);
+    MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId, 
dataSource, workerNumber, context, retryCount);
+    Assert.assertEquals(LookupLoadingSpec.Mode.ONLY_REQUIRED, 
msqWorkerTask.getLookupLoadingSpec().getMode());
+    Assert.assertEquals(ImmutableSet.of("lookupName1", "lookupName2"), 
msqWorkerTask.getLookupLoadingSpec().getLookupsToLoad());
+  }
+
+  @Test
+  public void testGetLookupLoadingSpecWithInvalidInput()
+  {
+    final HashMap<String, Object> context = new HashMap<>();
+    context.put(PlannerContext.CTX_LOOKUP_LOADING_MODE, 
LookupLoadingSpec.Mode.ONLY_REQUIRED);
+
+    // Setting CTX_LOOKUPS_TO_LOAD as null
+    context.put(PlannerContext.CTX_LOOKUPS_TO_LOAD, null);
+
+    MSQWorkerTask taskWithNullLookups = new MSQWorkerTask(controllerTaskId, 
dataSource, workerNumber, context, retryCount);
+    DruidException exception = Assert.assertThrows(
+        DruidException.class,
+        taskWithNullLookups::getLookupLoadingSpec
+    );
+    Assert.assertEquals(
+        "Set of lookups to load cannot be null for mode[ONLY_REQUIRED].",
+        exception.getMessage());
+
+    // Setting CTX_LOOKUPS_TO_LOAD as empty list
+    context.put(PlannerContext.CTX_LOOKUPS_TO_LOAD, Collections.emptyList());
+
+    MSQWorkerTask taskWithEmptyLookups = new MSQWorkerTask(controllerTaskId, 
dataSource, workerNumber, context, retryCount);
+    exception = Assert.assertThrows(
+        DruidException.class,
+        taskWithEmptyLookups::getLookupLoadingSpec
+    );
+    Assert.assertEquals(
+        "Set of lookups to load cannot be [] for mode[ONLY_REQUIRED].",
+        exception.getMessage());
+  }
 }
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
index fe78b481bee..cdaafc75c60 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
@@ -153,6 +153,7 @@ import org.apache.druid.server.SegmentManager;
 import org.apache.druid.server.SpecificSegmentsQuerySegmentWalker;
 import org.apache.druid.server.coordination.DataSegmentAnnouncer;
 import org.apache.druid.server.coordination.NoopDataSegmentAnnouncer;
+import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
 import org.apache.druid.server.security.AuthConfig;
 import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.sql.DirectStatement;
@@ -853,6 +854,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
     protected CompactionState expectedLastCompactionState = null;
     protected Set<Interval> expectedTombstoneIntervals = null;
     protected List<Object[]> expectedResultRows = null;
+    protected LookupLoadingSpec expectedLookupLoadingSpec = null;
     protected Matcher<Throwable> expectedValidationErrorMatcher = null;
     protected List<Pair<Predicate<MSQTaskReportPayload>, String>> 
adhocReportAssertionAndReasons = new ArrayList<>();
     protected Matcher<Throwable> expectedExecutionErrorMatcher = null;
@@ -917,6 +919,12 @@ public class MSQTestBase extends BaseCalciteQueryTest
       return asBuilder();
     }
 
+    public Builder setExpectedLookupLoadingSpec(LookupLoadingSpec 
lookupLoadingSpec)
+    {
+      this.expectedLookupLoadingSpec = lookupLoadingSpec;
+      return asBuilder();
+    }
+
     public Builder setExpectedMSQSpec(MSQSpec expectedMSQSpec)
     {
       this.expectedMSQSpec = expectedMSQSpec;
@@ -1010,6 +1018,23 @@ public class MSQTestBase extends BaseCalciteQueryTest
       assertThat(e, expectedValidationErrorMatcher);
     }
 
+    protected void verifyLookupLoadingInfoInTaskContext(Map<String, Object> 
context)
+    {
+      String lookupLoadingMode = 
context.get(PlannerContext.CTX_LOOKUP_LOADING_MODE).toString();
+      List<String> lookupsToLoad = (List<String>) 
context.get(PlannerContext.CTX_LOOKUPS_TO_LOAD);
+      if (expectedLookupLoadingSpec != null) {
+        Assert.assertEquals(expectedLookupLoadingSpec.getMode().toString(), 
lookupLoadingMode);
+        if 
(expectedLookupLoadingSpec.getMode().equals(LookupLoadingSpec.Mode.ONLY_REQUIRED))
 {
+          Assert.assertEquals(new 
ArrayList<>(expectedLookupLoadingSpec.getLookupsToLoad()), lookupsToLoad);
+        } else {
+          Assert.assertNull(lookupsToLoad);
+        }
+      } else {
+        Assert.assertEquals(LookupLoadingSpec.Mode.NONE.toString(), 
lookupLoadingMode);
+        Assert.assertNull(lookupsToLoad);
+      }
+    }
+
     protected void verifyWorkerCount(CounterSnapshotsTree counterSnapshotsTree)
     {
       Map<Integer, Map<Integer, CounterSnapshots>> counterMap = 
counterSnapshotsTree.copyMap();
@@ -1165,7 +1190,9 @@ public class MSQTestBase extends BaseCalciteQueryTest
         verifyWorkerCount(reportPayload.getCounters());
         verifyCounters(reportPayload.getCounters());
 
-        MSQSpec foundSpec = 
indexingServiceClient.getMSQControllerTask(controllerId).getQuerySpec();
+        MSQControllerTask msqControllerTask = 
indexingServiceClient.getMSQControllerTask(controllerId);
+        MSQSpec foundSpec = msqControllerTask.getQuerySpec();
+        verifyLookupLoadingInfoInTaskContext(msqControllerTask.getContext());
         log.info(
             "found generated segments: %s",
             segmentManager.getAllDataSegments().stream().map(s -> 
s.toString()).collect(
@@ -1393,6 +1420,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
           throw new ISE("Query %s failed due to %s", sql, 
payload.getStatus().getErrorReport().toString());
         } else {
           MSQControllerTask msqControllerTask = 
indexingServiceClient.getMSQControllerTask(controllerId);
+          verifyLookupLoadingInfoInTaskContext(msqControllerTask.getContext());
 
           final MSQSpec spec = msqControllerTask.getQuerySpec();
           final List<Object[]> rows;
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/QueryLookupOperatorConversion.java
 
b/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/QueryLookupOperatorConversion.java
index 9075456c812..8947bd60a01 100644
--- 
a/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/QueryLookupOperatorConversion.java
+++ 
b/sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/QueryLookupOperatorConversion.java
@@ -81,12 +81,16 @@ public class QueryLookupOperatorConversion implements 
SqlOperatorConversion
           final DruidExpression arg = inputExpressions.get(0);
           final Expr lookupNameExpr = 
plannerContext.parseExpression(inputExpressions.get(1).getExpression());
           final String replaceMissingValueWith = 
getReplaceMissingValueWith(inputExpressions, plannerContext);
+          final String lookupName = (String) lookupNameExpr.getLiteralValue();
+
+          // Add the lookup name to the set of lookups to selectively load.
+          plannerContext.addLookupToLoad(lookupName);
 
           if (arg.isSimpleExtraction() && lookupNameExpr.isLiteral()) {
             return arg.getSimpleExtraction().cascade(
                 new RegisteredLookupExtractionFn(
                     lookupExtractorFactoryContainerProvider,
-                    (String) lookupNameExpr.getLiteralValue(),
+                    lookupName,
                     false,
                     replaceMissingValueWith,
                     null,
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java 
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java
index 281fc66c8aa..99f721bffaa 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java
@@ -43,6 +43,7 @@ import org.apache.druid.query.lookup.LookupExtractor;
 import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
 import org.apache.druid.query.lookup.RegisteredLookupExtractionFn;
 import org.apache.druid.segment.join.JoinableFactoryWrapper;
+import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
 import org.apache.druid.server.security.Access;
 import org.apache.druid.server.security.AuthenticationResult;
 import org.apache.druid.server.security.ResourceAction;
@@ -61,6 +62,7 @@ import org.joda.time.Interval;
 import javax.annotation.Nullable;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -78,6 +80,8 @@ public class PlannerContext
   public static final String CTX_SQL_CURRENT_TIMESTAMP = "sqlCurrentTimestamp";
   public static final String CTX_SQL_TIME_ZONE = "sqlTimeZone";
   public static final String CTX_SQL_JOIN_ALGORITHM = "sqlJoinAlgorithm";
+  public static final String CTX_LOOKUP_LOADING_MODE = "lookupLoadingMode";
+  public static final String CTX_LOOKUPS_TO_LOAD = "lookupsToLoad";
   private static final JoinAlgorithm DEFAULT_SQL_JOIN_ALGORITHM = 
JoinAlgorithm.BROADCAST;
 
   /**
@@ -142,6 +146,7 @@ public class PlannerContext
   // set of attributes for a SQL statement used in the EXPLAIN PLAN output
   private ExplainAttributes explainAttributes;
   private PlannerLookupCache lookupCache;
+  private final Set<String> lookupsToLoad = new HashSet<>();
 
   private PlannerContext(
       final PlannerToolbox plannerToolbox,
@@ -343,6 +348,22 @@ public class PlannerContext
     return plannerToolbox.rootSchema().getResourceType(schema, resourceName);
   }
 
+  /**
+   * Adds the given lookup name to the lookup loading spec.
+   */
+  public void addLookupToLoad(String lookupName)
+  {
+    lookupsToLoad.add(lookupName);
+  }
+
+  /**
+   * Returns the lookup to load for a given task.
+   */
+  public LookupLoadingSpec getLookupLoadingSpec()
+  {
+    return lookupsToLoad.isEmpty() ? LookupLoadingSpec.NONE : 
LookupLoadingSpec.loadOnly(lookupsToLoad);
+  }
+
   /**
    * Return the query context as a mutable map. Use this form when
    * modifying the context during planning.
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlResourceCollectorShuttle.java
 
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlResourceCollectorShuttle.java
index 631936972e1..4300c7d574b 100644
--- 
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlResourceCollectorShuttle.java
+++ 
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlResourceCollectorShuttle.java
@@ -33,6 +33,7 @@ import org.apache.druid.server.security.Resource;
 import org.apache.druid.server.security.ResourceAction;
 import org.apache.druid.server.security.ResourceType;
 import org.apache.druid.sql.calcite.expression.AuthorizableOperator;
+import org.apache.druid.sql.calcite.schema.NamedLookupSchema;
 
 import java.util.HashSet;
 import java.util.List;
@@ -87,13 +88,19 @@ public class SqlResourceCollectorShuttle extends SqlShuttle
         if (qualifiedNameParts.size() == 2) {
           final String schema = qualifiedNameParts.get(0);
           final String resourceName = qualifiedNameParts.get(1);
+
+          // Add the lookup name to the set of lookups to selectively load.
+          if (schema.equals(NamedLookupSchema.NAME)) {
+            plannerContext.addLookupToLoad(resourceName);
+          }
+
           final String resourceType = 
plannerContext.getSchemaResourceType(schema, resourceName);
           if (resourceType != null) {
             resourceActions.add(new ResourceAction(new Resource(resourceName, 
resourceType), Action.READ));
           }
         } else if (qualifiedNameParts.size() > 2) {
           // Don't expect to see more than 2 names (catalog?).
-          throw new ISE("Cannot analyze table idetifier %s", 
qualifiedNameParts);
+          throw new ISE("Cannot analyze table identifier %s", 
qualifiedNameParts);
         }
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to