This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new ddfd62d9a98 Disable loading lookups by default in CompactionTask 
(#16420)
ddfd62d9a98 is described below

commit ddfd62d9a981e14ba197404eaf6c08646e92daa2
Author: Akshat Jain <[email protected]>
AuthorDate: Wed May 15 11:39:23 2024 +0530

    Disable loading lookups by default in CompactionTask (#16420)
    
    This PR updates CompactionTask to not load any lookups by default, unless 
transformSpec is present.
    
    If transformSpec is present, we will make the decision based on context 
values, loading all lookups by default. This is done to ensure backward 
compatibility since transformSpec can reference lookups.
    If transform spec is not present and no context value is passed, we donot 
load any lookup.
    
    This behavior can be overridden by supplying lookupLoadingMode and 
lookupsToLoad in the task context.
---
 .../msq/indexing/IndexerControllerContext.java     |  14 +-
 .../apache/druid/msq/indexing/MSQWorkerTask.java   |  27 --
 .../apache/druid/msq/sql/MSQTaskQueryMaker.java    |   4 +-
 .../druid/msq/indexing/MSQControllerTaskTest.java  |   5 +-
 .../druid/msq/indexing/MSQWorkerTaskTest.java      |  13 +-
 .../org/apache/druid/msq/test/MSQTestBase.java     |  17 +-
 .../druid/indexing/common/task/CompactionTask.java |   9 +
 .../apache/druid/indexing/common/task/Task.java    |   7 +-
 .../task/ClientCompactionTaskQuerySerdeTest.java   | 317 +++++++++++----------
 .../indexing/common/task/CompactionTaskTest.java   |  30 ++
 .../druid/indexing/common/task/TaskTest.java       |   7 +
 .../server/lookup/cache/LookupLoadingSpec.java     |  72 +++++
 .../server/lookup/cache/LookupLoadingSpecTest.java | 124 ++++++++
 .../druid/sql/calcite/planner/PlannerContext.java  |   4 +-
 14 files changed, 429 insertions(+), 221 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
index e8fba09ddc5..17ac82d736b 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
@@ -55,7 +55,7 @@ import org.apache.druid.rpc.ServiceClientFactory;
 import org.apache.druid.rpc.indexing.OverlordClient;
 import org.apache.druid.segment.realtime.firehose.ChatHandler;
 import org.apache.druid.server.DruidNode;
-import org.apache.druid.sql.calcite.planner.PlannerContext;
+import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
 
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
@@ -271,16 +271,16 @@ public class IndexerControllerContext implements 
ControllerContext
         .put(MultiStageQueryContext.CTX_MAX_CONCURRENT_STAGES, 
queryKernelConfig.getMaxConcurrentStages());
 
     // Put the lookup loading info in the task context to facilitate selective 
loading of lookups.
-    if (controllerTaskContext.get(PlannerContext.CTX_LOOKUP_LOADING_MODE) != 
null) {
+    if (controllerTaskContext.get(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE) 
!= null) {
       taskContextOverridesBuilder.put(
-          PlannerContext.CTX_LOOKUP_LOADING_MODE,
-          controllerTaskContext.get(PlannerContext.CTX_LOOKUP_LOADING_MODE)
+          LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE,
+          controllerTaskContext.get(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE)
       );
     }
-    if (controllerTaskContext.get(PlannerContext.CTX_LOOKUPS_TO_LOAD) != null) 
{
+    if (controllerTaskContext.get(LookupLoadingSpec.CTX_LOOKUPS_TO_LOAD) != 
null) {
       taskContextOverridesBuilder.put(
-          PlannerContext.CTX_LOOKUPS_TO_LOAD,
-          controllerTaskContext.get(PlannerContext.CTX_LOOKUPS_TO_LOAD)
+          LookupLoadingSpec.CTX_LOOKUPS_TO_LOAD,
+          controllerTaskContext.get(LookupLoadingSpec.CTX_LOOKUPS_TO_LOAD)
       );
     }
 
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java
index a23c62881a0..b4d18ea390e 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java
@@ -27,7 +27,6 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableSet;
 import com.google.inject.Injector;
-import org.apache.druid.error.InvalidInput;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
@@ -38,13 +37,9 @@ import org.apache.druid.msq.exec.MSQTasks;
 import org.apache.druid.msq.exec.Worker;
 import org.apache.druid.msq.exec.WorkerContext;
 import org.apache.druid.msq.exec.WorkerImpl;
-import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
 import org.apache.druid.server.security.ResourceAction;
-import org.apache.druid.sql.calcite.planner.PlannerContext;
 
 import javax.annotation.Nonnull;
-import java.util.Collection;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
@@ -190,26 +185,4 @@ public class MSQWorkerTask extends AbstractTask
   {
     return Objects.hash(super.hashCode(), controllerTaskId, workerNumber, 
retryCount, worker);
   }
-
-  @Override
-  public LookupLoadingSpec getLookupLoadingSpec()
-  {
-    final Object lookupModeValue = 
getContext().get(PlannerContext.CTX_LOOKUP_LOADING_MODE);
-    if (lookupModeValue == null) {
-      return LookupLoadingSpec.ALL;
-    }
-
-    final LookupLoadingSpec.Mode lookupLoadingMode = 
LookupLoadingSpec.Mode.valueOf(lookupModeValue.toString());
-    if (lookupLoadingMode == LookupLoadingSpec.Mode.NONE) {
-      return LookupLoadingSpec.NONE;
-    } else if (lookupLoadingMode == LookupLoadingSpec.Mode.ONLY_REQUIRED) {
-      Collection<String> lookupsToLoad = (Collection<String>) 
getContext().get(PlannerContext.CTX_LOOKUPS_TO_LOAD);
-      if (lookupsToLoad == null || lookupsToLoad.isEmpty()) {
-        throw InvalidInput.exception("Set of lookups to load cannot be %s for 
mode[ONLY_REQUIRED].", lookupsToLoad);
-      }
-      return LookupLoadingSpec.loadOnly(new HashSet<>(lookupsToLoad));
-    } else {
-      return LookupLoadingSpec.ALL;
-    }
-  }
 }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
index 533010c3057..4debe4d9d43 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
@@ -285,9 +285,9 @@ public class MSQTaskQueryMaker implements QueryMaker
     MSQTaskQueryMakerUtils.validateRealtimeReindex(querySpec);
 
     final Map<String, Object> context = new HashMap<>();
-    context.put(PlannerContext.CTX_LOOKUP_LOADING_MODE, 
plannerContext.getLookupLoadingSpec().getMode());
+    context.put(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, 
plannerContext.getLookupLoadingSpec().getMode());
     if (plannerContext.getLookupLoadingSpec().getMode() == 
LookupLoadingSpec.Mode.ONLY_REQUIRED) {
-      context.put(PlannerContext.CTX_LOOKUPS_TO_LOAD, 
plannerContext.getLookupLoadingSpec().getLookupsToLoad());
+      context.put(LookupLoadingSpec.CTX_LOOKUPS_TO_LOAD, 
plannerContext.getLookupLoadingSpec().getLookupsToLoad());
     }
 
     final MSQControllerTask controllerTask = new MSQControllerTask(
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java
index a5001fb58eb..9de14610f19 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java
@@ -38,7 +38,6 @@ import 
org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
 import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
 import org.apache.druid.sql.calcite.planner.ColumnMapping;
 import org.apache.druid.sql.calcite.planner.ColumnMappings;
-import org.apache.druid.sql.calcite.planner.PlannerContext;
 import org.joda.time.Interval;
 import org.junit.Assert;
 import org.junit.Test;
@@ -114,8 +113,8 @@ public class MSQControllerTaskTest
                    .dataSource("target")
                    .context(
                        ImmutableMap.of(
-                           PlannerContext.CTX_LOOKUPS_TO_LOAD, 
Arrays.asList("lookupName1", "lookupName2"),
-                           PlannerContext.CTX_LOOKUP_LOADING_MODE, 
LookupLoadingSpec.Mode.ONLY_REQUIRED)
+                           LookupLoadingSpec.CTX_LOOKUPS_TO_LOAD, 
Arrays.asList("lookupName1", "lookupName2"),
+                           LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, 
LookupLoadingSpec.Mode.ONLY_REQUIRED)
                    )
                    .build()
         )
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java
index 5e79b129f3b..3672e9d1c29 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java
@@ -23,7 +23,6 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import org.apache.druid.error.DruidException;
 import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
-import org.apache.druid.sql.calcite.planner.PlannerContext;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -125,7 +124,7 @@ public class MSQWorkerTaskTest
   @Test
   public void testGetLookupLoadingWithModeNoneInContext()
   {
-    final ImmutableMap<String, Object> context = 
ImmutableMap.of(PlannerContext.CTX_LOOKUP_LOADING_MODE, 
LookupLoadingSpec.Mode.NONE);
+    final ImmutableMap<String, Object> context = 
ImmutableMap.of(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, 
LookupLoadingSpec.Mode.NONE);
     MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId, 
dataSource, workerNumber, context, retryCount);
     Assert.assertEquals(LookupLoadingSpec.NONE, 
msqWorkerTask.getLookupLoadingSpec());
   }
@@ -134,8 +133,8 @@ public class MSQWorkerTaskTest
   public void testGetLookupLoadingSpecWithLookupListInContext()
   {
     final ImmutableMap<String, Object> context = ImmutableMap.of(
-        PlannerContext.CTX_LOOKUPS_TO_LOAD, Arrays.asList("lookupName1", 
"lookupName2"),
-        PlannerContext.CTX_LOOKUP_LOADING_MODE, 
LookupLoadingSpec.Mode.ONLY_REQUIRED);
+        LookupLoadingSpec.CTX_LOOKUPS_TO_LOAD, Arrays.asList("lookupName1", 
"lookupName2"),
+        LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, 
LookupLoadingSpec.Mode.ONLY_REQUIRED);
     MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId, 
dataSource, workerNumber, context, retryCount);
     Assert.assertEquals(LookupLoadingSpec.Mode.ONLY_REQUIRED, 
msqWorkerTask.getLookupLoadingSpec().getMode());
     Assert.assertEquals(ImmutableSet.of("lookupName1", "lookupName2"), 
msqWorkerTask.getLookupLoadingSpec().getLookupsToLoad());
@@ -145,10 +144,10 @@ public class MSQWorkerTaskTest
   public void testGetLookupLoadingSpecWithInvalidInput()
   {
     final HashMap<String, Object> context = new HashMap<>();
-    context.put(PlannerContext.CTX_LOOKUP_LOADING_MODE, 
LookupLoadingSpec.Mode.ONLY_REQUIRED);
+    context.put(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, 
LookupLoadingSpec.Mode.ONLY_REQUIRED);
 
     // Setting CTX_LOOKUPS_TO_LOAD as null
-    context.put(PlannerContext.CTX_LOOKUPS_TO_LOAD, null);
+    context.put(LookupLoadingSpec.CTX_LOOKUPS_TO_LOAD, null);
 
     MSQWorkerTask taskWithNullLookups = new MSQWorkerTask(controllerTaskId, 
dataSource, workerNumber, context, retryCount);
     DruidException exception = Assert.assertThrows(
@@ -160,7 +159,7 @@ public class MSQWorkerTaskTest
         exception.getMessage());
 
     // Setting CTX_LOOKUPS_TO_LOAD as empty list
-    context.put(PlannerContext.CTX_LOOKUPS_TO_LOAD, Collections.emptyList());
+    context.put(LookupLoadingSpec.CTX_LOOKUPS_TO_LOAD, 
Collections.emptyList());
 
     MSQWorkerTask taskWithEmptyLookups = new MSQWorkerTask(controllerTaskId, 
dataSource, workerNumber, context, retryCount);
     exception = Assert.assertThrows(
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
index d9b6e28b32b..e6fb74877d4 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
@@ -855,7 +855,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
     protected CompactionState expectedLastCompactionState = null;
     protected Set<Interval> expectedTombstoneIntervals = null;
     protected List<Object[]> expectedResultRows = null;
-    protected LookupLoadingSpec expectedLookupLoadingSpec = null;
+    protected LookupLoadingSpec expectedLookupLoadingSpec = 
LookupLoadingSpec.NONE;
     protected Matcher<Throwable> expectedValidationErrorMatcher = null;
     protected List<Pair<Predicate<MSQTaskReportPayload>, String>> 
adhocReportAssertionAndReasons = new ArrayList<>();
     protected Matcher<Throwable> expectedExecutionErrorMatcher = null;
@@ -1021,19 +1021,8 @@ public class MSQTestBase extends BaseCalciteQueryTest
 
     protected void verifyLookupLoadingInfoInTaskContext(Map<String, Object> 
context)
     {
-      String lookupLoadingMode = 
context.get(PlannerContext.CTX_LOOKUP_LOADING_MODE).toString();
-      List<String> lookupsToLoad = (List<String>) 
context.get(PlannerContext.CTX_LOOKUPS_TO_LOAD);
-      if (expectedLookupLoadingSpec != null) {
-        Assert.assertEquals(expectedLookupLoadingSpec.getMode().toString(), 
lookupLoadingMode);
-        if 
(expectedLookupLoadingSpec.getMode().equals(LookupLoadingSpec.Mode.ONLY_REQUIRED))
 {
-          Assert.assertEquals(new 
ArrayList<>(expectedLookupLoadingSpec.getLookupsToLoad()), lookupsToLoad);
-        } else {
-          Assert.assertNull(lookupsToLoad);
-        }
-      } else {
-        Assert.assertEquals(LookupLoadingSpec.Mode.NONE.toString(), 
lookupLoadingMode);
-        Assert.assertNull(lookupsToLoad);
-      }
+      LookupLoadingSpec specFromContext = 
LookupLoadingSpec.createFromContext(context, LookupLoadingSpec.ALL);
+      Assert.assertEquals(expectedLookupLoadingSpec, specFromContext);
     }
 
     protected void verifyWorkerCount(CounterSnapshotsTree counterSnapshotsTree)
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index 833bcdd2fed..81447f3fd5e 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -92,6 +92,7 @@ import 
org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
 import org.apache.druid.segment.transform.TransformSpec;
 import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
 import org.apache.druid.server.coordinator.duty.CompactSegments;
+import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
 import org.apache.druid.server.security.ResourceAction;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentTimeline;
@@ -249,6 +250,14 @@ public class CompactionTask extends AbstractBatchIndexTask 
implements PendingSeg
     this.segmentProvider = new SegmentProvider(dataSource, 
this.ioConfig.getInputSpec());
     this.partitionConfigurationManager = new 
PartitionConfigurationManager(this.tuningConfig);
     this.segmentCacheManagerFactory = segmentCacheManagerFactory;
+
+    // Do not load any lookups in sub-tasks launched by compaction task, 
unless transformSpec is present.
+    // If transformSpec is present, we will not modify the context so that the 
sub-tasks can make the
+    // decision based on context values, loading all lookups by default.
+    // This is done to ensure backward compatibility since transformSpec can 
reference lookups.
+    if (transformSpec == null) {
+      addToContextIfAbsent(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, 
LookupLoadingSpec.Mode.NONE.toString());
+    }
   }
 
   @VisibleForTesting
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
index cdf7cea7e3f..18d2ac7d696 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
@@ -334,9 +334,14 @@ public interface Task
     );
   }
 
+  /**
+   * Specifies the list of lookups to load for this task. Tasks load ALL 
lookups by default.
+   * This behaviour can be overridden by passing parameters {@link 
LookupLoadingSpec#CTX_LOOKUP_LOADING_MODE}
+   * and {@link LookupLoadingSpec#CTX_LOOKUPS_TO_LOAD} in the task context.
+   */
   @Nullable
   default LookupLoadingSpec getLookupLoadingSpec()
   {
-    return LookupLoadingSpec.ALL;
+    return LookupLoadingSpec.createFromContext(getContext(), 
LookupLoadingSpec.ALL);
   }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
index fc581d4954b..f519519095c 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
@@ -63,6 +63,7 @@ import 
org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
 import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
 import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
 import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
+import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
 import org.apache.druid.server.security.AuthTestUtils;
 import org.apache.druid.server.security.AuthorizerMapper;
 import org.joda.time.Duration;
@@ -71,6 +72,7 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.Map;
 
 public class ClientCompactionTaskQuerySerdeTest
 {
@@ -78,61 +80,116 @@ public class ClientCompactionTaskQuerySerdeTest
       new TestUtils().getRowIngestionMetersFactory();
   private static final CoordinatorClient COORDINATOR_CLIENT = new 
NoopCoordinatorClient();
   private static final AppenderatorsManager APPENDERATORS_MANAGER = new 
TestAppenderatorsManager();
+  private static final ObjectMapper MAPPER = 
setupInjectablesInObjectMapper(new DefaultObjectMapper());
+
+  private static final IndexSpec INDEX_SPEC = IndexSpec.builder()
+                                                       
.withDimensionCompression(CompressionStrategy.LZ4)
+                                                       
.withMetricCompression(CompressionStrategy.LZF)
+                                                       
.withLongEncoding(LongEncodingStrategy.LONGS)
+                                                       .build();
+  private static final IndexSpec INDEX_SPEC_FOR_INTERMEDIATE_PERSISTS = 
IndexSpec.builder()
+                                                                               
  .withDimensionCompression(CompressionStrategy.LZ4)
+                                                                               
  .withMetricCompression(CompressionStrategy.UNCOMPRESSED)
+                                                                               
  .withLongEncoding(LongEncodingStrategy.AUTO)
+                                                                               
  .build();
+  private static final ClientCompactionTaskGranularitySpec 
CLIENT_COMPACTION_TASK_GRANULARITY_SPEC =
+      new ClientCompactionTaskGranularitySpec(Granularities.DAY, 
Granularities.HOUR, true);
+  private static final AggregatorFactory[] METRICS_SPEC = new 
AggregatorFactory[] {new CountAggregatorFactory("cnt")};
+  private static final ClientCompactionTaskTransformSpec 
CLIENT_COMPACTION_TASK_TRANSFORM_SPEC =
+      new ClientCompactionTaskTransformSpec(new SelectorDimFilter("dim1", 
"foo", null));
+  private static final DynamicPartitionsSpec DYNAMIC_PARTITIONS_SPEC = new 
DynamicPartitionsSpec(100, 30000L);
+  private static final SegmentsSplitHintSpec SEGMENTS_SPLIT_HINT_SPEC = new 
SegmentsSplitHintSpec(new HumanReadableBytes(100000L), 10);
 
   @Test
   public void testClientCompactionTaskQueryToCompactionTask() throws 
IOException
   {
-    final ObjectMapper mapper = setupInjectablesInObjectMapper(new 
DefaultObjectMapper());
-    final ClientCompactionTaskQuery query = new ClientCompactionTaskQuery(
-        "id",
-        "datasource",
-        new ClientCompactionIOConfig(
-            new ClientCompactionIntervalSpec(
-                Intervals.of("2019/2020"),
-                "testSha256OfSortedSegmentIds"
-            ),
-            true
-        ),
-        new ClientCompactionTaskQueryTuningConfig(
-            null,
-            null,
-            40000,
-            2000L,
-            null,
-            new SegmentsSplitHintSpec(new HumanReadableBytes(100000L), 10),
-            new DynamicPartitionsSpec(100, 30000L),
-            IndexSpec.builder()
-                     .withDimensionCompression(CompressionStrategy.LZ4)
-                     .withMetricCompression(CompressionStrategy.LZF)
-                     .withLongEncoding(LongEncodingStrategy.LONGS)
-                     .build(),
-            IndexSpec.builder()
-                     .withDimensionCompression(CompressionStrategy.LZ4)
-                     .withMetricCompression(CompressionStrategy.UNCOMPRESSED)
-                     .withLongEncoding(LongEncodingStrategy.AUTO)
-                     .build(),
-            2,
-            1000L,
-            TmpFileSegmentWriteOutMediumFactory.instance(),
-            100,
-            5,
-            1000L,
-            new Duration(3000L),
-            7,
-            1000,
-            100,
-            2
+    final ClientCompactionTaskQuery query = createCompactionTaskQuery("id", 
CLIENT_COMPACTION_TASK_TRANSFORM_SPEC);
+
+    final byte[] json = MAPPER.writeValueAsBytes(query);
+    final CompactionTask task = (CompactionTask) MAPPER.readValue(json, 
Task.class);
+
+    assertQueryToTask(query, task);
+  }
+
+  @Test
+  public void 
testClientCompactionTaskQueryToCompactionTaskWithoutTransformSpec() throws 
IOException
+  {
+    final ClientCompactionTaskQuery query = createCompactionTaskQuery("id", 
null);
+
+    final byte[] json = MAPPER.writeValueAsBytes(query);
+    final CompactionTask task = (CompactionTask) MAPPER.readValue(json, 
Task.class);
+
+    // Verify that CompactionTask has added new parameters into the context 
because transformSpec was null.
+    Assert.assertNotEquals(query.getContext(), task.getContext());
+    query.getContext().put(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, 
LookupLoadingSpec.Mode.NONE.toString());
+    assertQueryToTask(query, task);
+  }
+
+  @Test
+  public void testCompactionTaskToClientCompactionTaskQuery() throws 
IOException
+  {
+    final CompactionTask task = 
createCompactionTask(CLIENT_COMPACTION_TASK_TRANSFORM_SPEC);
+
+    final ClientCompactionTaskQuery expected = 
createCompactionTaskQuery(task.getId(), CLIENT_COMPACTION_TASK_TRANSFORM_SPEC);
+
+    final byte[] json = MAPPER.writeValueAsBytes(task);
+    final ClientCompactionTaskQuery actual = (ClientCompactionTaskQuery) 
MAPPER.readValue(json, ClientTaskQuery.class);
+
+    Assert.assertEquals(expected, actual);
+  }
+
+  @Test
+  public void 
testCompactionTaskToClientCompactionTaskQueryWithoutTransformSpec() throws 
IOException
+  {
+    final CompactionTask task = createCompactionTask(null);
+
+    final ClientCompactionTaskQuery expected = 
createCompactionTaskQuery(task.getId(), null);
+
+    final byte[] json = MAPPER.writeValueAsBytes(task);
+    final ClientCompactionTaskQuery actual = (ClientCompactionTaskQuery) 
MAPPER.readValue(json, ClientTaskQuery.class);
+
+    // Verify that CompactionTask has added new parameters into the context
+    Assert.assertNotEquals(expected, actual);
+
+    expected.getContext().put(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, 
LookupLoadingSpec.Mode.NONE.toString());
+    Assert.assertEquals(expected, actual);
+  }
+
+  private static ObjectMapper setupInjectablesInObjectMapper(ObjectMapper 
objectMapper)
+  {
+    final GuiceAnnotationIntrospector guiceIntrospector = new 
GuiceAnnotationIntrospector();
+    objectMapper.setAnnotationIntrospectors(
+        new AnnotationIntrospectorPair(
+            guiceIntrospector,
+            objectMapper.getSerializationConfig().getAnnotationIntrospector()
         ),
-        new ClientCompactionTaskGranularitySpec(Granularities.DAY, 
Granularities.HOUR, true),
-        new 
ClientCompactionTaskDimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts",
 "dim"))),
-        new AggregatorFactory[] {new CountAggregatorFactory("cnt")},
-        new ClientCompactionTaskTransformSpec(new SelectorDimFilter("dim1", 
"foo", null)),
-        ImmutableMap.of("key", "value")
+        new AnnotationIntrospectorPair(
+            guiceIntrospector,
+            objectMapper.getDeserializationConfig().getAnnotationIntrospector()
+        )
     );
+    GuiceInjectableValues injectableValues = new GuiceInjectableValues(
+        GuiceInjectors.makeStartupInjectorWithModules(
+            ImmutableList.of(
+                binder -> {
+                  
binder.bind(AuthorizerMapper.class).toInstance(AuthTestUtils.TEST_AUTHORIZER_MAPPER);
+                  binder.bind(ChatHandlerProvider.class).toInstance(new 
NoopChatHandlerProvider());
+                  
binder.bind(RowIngestionMetersFactory.class).toInstance(ROW_INGESTION_METERS_FACTORY);
+                  
binder.bind(CoordinatorClient.class).toInstance(COORDINATOR_CLIENT);
+                  binder.bind(SegmentCacheManagerFactory.class).toInstance(new 
SegmentCacheManagerFactory(objectMapper));
+                  
binder.bind(AppenderatorsManager.class).toInstance(APPENDERATORS_MANAGER);
+                  binder.bind(OverlordClient.class).toInstance(new 
NoopOverlordClient());
+                }
+            )
+        )
+    );
+    objectMapper.setInjectableValues(injectableValues);
+    objectMapper.registerSubtypes(new 
NamedType(ParallelIndexTuningConfig.class, "index_parallel"));
+    return objectMapper;
+  }
 
-    final byte[] json = mapper.writeValueAsBytes(query);
-    final CompactionTask task = (CompactionTask) mapper.readValue(json, 
Task.class);
-
+  private void assertQueryToTask(ClientCompactionTaskQuery query, 
CompactionTask task)
+  {
     Assert.assertEquals(query.getId(), task.getId());
     Assert.assertEquals(query.getDataSource(), task.getDataSource());
     Assert.assertTrue(task.getIoConfig().getInputSpec() instanceof 
CompactionIntervalSpec);
@@ -226,8 +283,8 @@ public class ClientCompactionTaskQuerySerdeTest
         task.getDimensionsSpec().getDimensions()
     );
     Assert.assertEquals(
-        query.getTransformSpec().getFilter(),
-        task.getTransformSpec().getFilter()
+        query.getTransformSpec(),
+        task.getTransformSpec()
     );
     Assert.assertArrayEquals(
         query.getMetricsSpec(),
@@ -235,16 +292,53 @@ public class ClientCompactionTaskQuerySerdeTest
     );
   }
 
-  @Test
-  public void testCompactionTaskToClientCompactionTaskQuery() throws 
IOException
+  private ClientCompactionTaskQuery createCompactionTaskQuery(String id, 
ClientCompactionTaskTransformSpec transformSpec)
   {
-    final ObjectMapper mapper = setupInjectablesInObjectMapper(new 
DefaultObjectMapper());
-    final CompactionTask.Builder builder = new CompactionTask.Builder(
+    Map<String, Object> context = new HashMap<>();
+    context.put("key", "value");
+    return new ClientCompactionTaskQuery(
+        id,
         "datasource",
-        new SegmentCacheManagerFactory(mapper),
-        new RetryPolicyFactory(new RetryPolicyConfig())
+        new ClientCompactionIOConfig(
+            new ClientCompactionIntervalSpec(Intervals.of("2019/2020"), 
"testSha256OfSortedSegmentIds"), true
+        ),
+        new ClientCompactionTaskQueryTuningConfig(
+            100,
+            new OnheapIncrementalIndex.Spec(true),
+            40000,
+            2000L,
+            30000L,
+            SEGMENTS_SPLIT_HINT_SPEC,
+            DYNAMIC_PARTITIONS_SPEC,
+            INDEX_SPEC,
+            INDEX_SPEC_FOR_INTERMEDIATE_PERSISTS,
+            2,
+            1000L,
+            TmpFileSegmentWriteOutMediumFactory.instance(),
+            100,
+            5,
+            1000L,
+            new Duration(3000L),
+            7,
+            1000,
+            100,
+            2
+        ),
+        CLIENT_COMPACTION_TASK_GRANULARITY_SPEC,
+        new 
ClientCompactionTaskDimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts",
 "dim"))),
+        METRICS_SPEC,
+        transformSpec,
+        context
     );
-    final CompactionTask task = builder
+  }
+
+  private CompactionTask 
createCompactionTask(ClientCompactionTaskTransformSpec transformSpec)
+  {
+    CompactionTask.Builder compactionTaskBuilder = new CompactionTask.Builder(
+        "datasource",
+        new SegmentCacheManagerFactory(MAPPER),
+        new RetryPolicyFactory(new RetryPolicyConfig())
+    )
         .inputSpec(new CompactionIntervalSpec(Intervals.of("2019/2020"), 
"testSha256OfSortedSegmentIds"), true)
         .tuningConfig(
             new ParallelIndexTuningConfig(
@@ -256,18 +350,10 @@ public class ClientCompactionTaskQuerySerdeTest
                 null,
                 null,
                 null,
-                new SegmentsSplitHintSpec(new HumanReadableBytes(100000L), 10),
-                new DynamicPartitionsSpec(100, 30000L),
-                IndexSpec.builder()
-                         .withDimensionCompression(CompressionStrategy.LZ4)
-                         .withMetricCompression(CompressionStrategy.LZF)
-                         .withLongEncoding(LongEncodingStrategy.LONGS)
-                         .build(),
-                IndexSpec.builder()
-                         .withDimensionCompression(CompressionStrategy.LZ4)
-                         
.withMetricCompression(CompressionStrategy.UNCOMPRESSED)
-                         .withLongEncoding(LongEncodingStrategy.AUTO)
-                         .build(),
+                SEGMENTS_SPLIT_HINT_SPEC,
+                DYNAMIC_PARTITIONS_SPEC,
+                INDEX_SPEC,
+                INDEX_SPEC_FOR_INTERMEDIATE_PERSISTS,
                 2,
                 null,
                 null,
@@ -290,100 +376,17 @@ public class ClientCompactionTaskQuerySerdeTest
                 null
             )
         )
-        .granularitySpec(new 
ClientCompactionTaskGranularitySpec(Granularities.DAY, Granularities.HOUR, 
true))
+        .granularitySpec(CLIENT_COMPACTION_TASK_GRANULARITY_SPEC)
         .dimensionsSpec(
             DimensionsSpec.builder()
                           
.setDimensions(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim")))
                           .setDimensionExclusions(ImmutableList.of("__time", 
"val"))
                           .build()
         )
-        .metricsSpec(new AggregatorFactory[] {new 
CountAggregatorFactory("cnt")})
-        .transformSpec(new ClientCompactionTaskTransformSpec(new 
SelectorDimFilter("dim1", "foo", null)))
-        .build();
+        .metricsSpec(METRICS_SPEC)
+        .transformSpec(transformSpec)
+        .context(ImmutableMap.of("key", "value"));
 
-    final ClientCompactionTaskQuery expected = new ClientCompactionTaskQuery(
-        task.getId(),
-        "datasource",
-        new ClientCompactionIOConfig(
-            new ClientCompactionIntervalSpec(
-                Intervals.of("2019/2020"),
-                "testSha256OfSortedSegmentIds"
-            ),
-            true
-        ),
-        new ClientCompactionTaskQueryTuningConfig(
-            100,
-            new OnheapIncrementalIndex.Spec(true),
-            40000,
-            2000L,
-            30000L,
-            new SegmentsSplitHintSpec(new HumanReadableBytes(100000L), 10),
-            new DynamicPartitionsSpec(100, 30000L),
-            IndexSpec.builder()
-                     .withDimensionCompression(CompressionStrategy.LZ4)
-                     .withMetricCompression(CompressionStrategy.LZF)
-                     .withLongEncoding(LongEncodingStrategy.LONGS)
-                     .build(),
-            IndexSpec.builder()
-                     .withDimensionCompression(CompressionStrategy.LZ4)
-                     .withMetricCompression(CompressionStrategy.UNCOMPRESSED)
-                     .withLongEncoding(LongEncodingStrategy.AUTO)
-                     .build(),
-            2,
-            1000L,
-            TmpFileSegmentWriteOutMediumFactory.instance(),
-            100,
-            5,
-            1000L,
-            new Duration(3000L),
-            7,
-            1000,
-            100,
-            2
-        ),
-        new ClientCompactionTaskGranularitySpec(Granularities.DAY, 
Granularities.HOUR, true),
-        new 
ClientCompactionTaskDimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts",
 "dim"))),
-        new AggregatorFactory[] {new CountAggregatorFactory("cnt")},
-        new ClientCompactionTaskTransformSpec(new SelectorDimFilter("dim1", 
"foo", null)),
-        new HashMap<>()
-    );
-
-    final byte[] json = mapper.writeValueAsBytes(task);
-    final ClientCompactionTaskQuery actual = (ClientCompactionTaskQuery) 
mapper.readValue(json, ClientTaskQuery.class);
-
-    Assert.assertEquals(expected, actual);
-  }
-
-  private static ObjectMapper setupInjectablesInObjectMapper(ObjectMapper 
objectMapper)
-  {
-    final GuiceAnnotationIntrospector guiceIntrospector = new 
GuiceAnnotationIntrospector();
-    objectMapper.setAnnotationIntrospectors(
-        new AnnotationIntrospectorPair(
-            guiceIntrospector,
-            objectMapper.getSerializationConfig().getAnnotationIntrospector()
-        ),
-        new AnnotationIntrospectorPair(
-            guiceIntrospector,
-            objectMapper.getDeserializationConfig().getAnnotationIntrospector()
-        )
-    );
-    GuiceInjectableValues injectableValues = new GuiceInjectableValues(
-        GuiceInjectors.makeStartupInjectorWithModules(
-            ImmutableList.of(
-                binder -> {
-                  
binder.bind(AuthorizerMapper.class).toInstance(AuthTestUtils.TEST_AUTHORIZER_MAPPER);
-                  binder.bind(ChatHandlerProvider.class).toInstance(new 
NoopChatHandlerProvider());
-                  
binder.bind(RowIngestionMetersFactory.class).toInstance(ROW_INGESTION_METERS_FACTORY);
-                  
binder.bind(CoordinatorClient.class).toInstance(COORDINATOR_CLIENT);
-                  binder.bind(SegmentCacheManagerFactory.class).toInstance(new 
SegmentCacheManagerFactory(objectMapper));
-                  
binder.bind(AppenderatorsManager.class).toInstance(APPENDERATORS_MANAGER);
-                  binder.bind(OverlordClient.class).toInstance(new 
NoopOverlordClient());
-                }
-            )
-        )
-    );
-    objectMapper.setInjectableValues(injectableValues);
-    objectMapper.registerSubtypes(new 
NamedType(ParallelIndexTuningConfig.class, "index_parallel"));
-    return objectMapper;
+    return compactionTaskBuilder.build();
   }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
index 4ccd3f49811..7a39b46631c 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
@@ -133,6 +133,7 @@ import 
org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
 import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
 import org.apache.druid.segment.selector.settable.SettableColumnValueSelector;
 import 
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
+import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
 import org.apache.druid.server.security.AuthTestUtils;
 import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.server.security.ResourceAction;
@@ -1738,6 +1739,35 @@ public class CompactionTaskTest
     Assert.assertNull(chooseFinestGranularityHelper(input));
   }
 
+  @Test
+  public void testGetDefaultLookupLoadingSpec()
+  {
+    final Builder builder = new Builder(
+        DATA_SOURCE,
+        segmentCacheManagerFactory,
+        RETRY_POLICY_FACTORY
+    );
+    final CompactionTask task = builder
+        .interval(Intervals.of("2000-01-01/2000-01-02"))
+        .build();
+    Assert.assertEquals(LookupLoadingSpec.NONE, task.getLookupLoadingSpec());
+  }
+
+  @Test
+  public void testGetDefaultLookupLoadingSpecWithTransformSpec()
+  {
+    final Builder builder = new Builder(
+        DATA_SOURCE,
+        segmentCacheManagerFactory,
+        RETRY_POLICY_FACTORY
+    );
+    final CompactionTask task = builder
+        .interval(Intervals.of("2000-01-01/2000-01-02"))
+        .transformSpec(new ClientCompactionTaskTransformSpec(new 
SelectorDimFilter("dim1", "foo", null)))
+        .build();
+    Assert.assertEquals(LookupLoadingSpec.ALL, task.getLookupLoadingSpec());
+  }
+
   private Granularity chooseFinestGranularityHelper(List<Granularity> 
granularities)
   {
     SettableSupplier<Granularity> queryGranularity = new SettableSupplier<>();
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskTest.java
index c2957f6688c..33502ecb3fb 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskTest.java
@@ -26,6 +26,7 @@ import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.java.util.common.UOE;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryRunner;
+import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -128,4 +129,10 @@ public class TaskTest
         TASK::getInputSourceResources
     );
   }
+
+  @Test
+  public void testGetLookupLoadingSpec()
+  {
+    Assert.assertEquals(LookupLoadingSpec.ALL, TASK.getLookupLoadingSpec());
+  }
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/lookup/cache/LookupLoadingSpec.java
 
b/server/src/main/java/org/apache/druid/server/lookup/cache/LookupLoadingSpec.java
index 88524fe27f9..4665bdd18cf 100644
--- 
a/server/src/main/java/org/apache/druid/server/lookup/cache/LookupLoadingSpec.java
+++ 
b/server/src/main/java/org/apache/druid/server/lookup/cache/LookupLoadingSpec.java
@@ -22,6 +22,11 @@ package org.apache.druid.server.lookup.cache;
 import com.google.common.collect.ImmutableSet;
 import org.apache.druid.error.InvalidInput;
 
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 
 /**
@@ -39,6 +44,10 @@ import java.util.Set;
  */
 public class LookupLoadingSpec
 {
+
+  public static final String CTX_LOOKUP_LOADING_MODE = "lookupLoadingMode";
+  public static final String CTX_LOOKUPS_TO_LOAD = "lookupsToLoad";
+
   public enum Mode
   {
     ALL, NONE, ONLY_REQUIRED
@@ -80,6 +89,50 @@ public class LookupLoadingSpec
     return lookupsToLoad;
   }
 
+  public static LookupLoadingSpec createFromContext(Map<String, Object> 
context, LookupLoadingSpec defaultSpec)
+  {
+    if (context == null) {
+      return defaultSpec;
+    }
+
+    final Object lookupModeValue = context.get(CTX_LOOKUP_LOADING_MODE);
+    if (lookupModeValue == null) {
+      return defaultSpec;
+    }
+
+    final LookupLoadingSpec.Mode lookupLoadingMode;
+    try {
+      lookupLoadingMode = 
LookupLoadingSpec.Mode.valueOf(lookupModeValue.toString());
+    }
+    catch (IllegalArgumentException e) {
+      throw InvalidInput.exception("Invalid value of %s[%s]. Allowed values 
are %s",
+                                   CTX_LOOKUP_LOADING_MODE, 
lookupModeValue.toString(), Arrays.asList(LookupLoadingSpec.Mode.values()));
+    }
+
+    if (lookupLoadingMode == Mode.NONE) {
+      return NONE;
+    } else if (lookupLoadingMode == Mode.ALL) {
+      return ALL;
+    } else if (lookupLoadingMode == Mode.ONLY_REQUIRED) {
+      Collection<String> lookupsToLoad;
+      try {
+        lookupsToLoad = (Collection<String>) context.get(CTX_LOOKUPS_TO_LOAD);
+      }
+      catch (ClassCastException e) {
+        throw InvalidInput.exception("Invalid value of %s[%s]. Please provide 
a comma-separated list of "
+                                     + "lookup names. For example: 
[\"lookupName1\", \"lookupName2\"]",
+                                     CTX_LOOKUPS_TO_LOAD, 
context.get(CTX_LOOKUPS_TO_LOAD));
+      }
+
+      if (lookupsToLoad == null || lookupsToLoad.isEmpty()) {
+        throw InvalidInput.exception("Set of lookups to load cannot be %s for 
mode[ONLY_REQUIRED].", lookupsToLoad);
+      }
+      return LookupLoadingSpec.loadOnly(new HashSet<>(lookupsToLoad));
+    } else {
+      return defaultSpec;
+    }
+  }
+
   @Override
   public String toString()
   {
@@ -88,4 +141,23 @@ public class LookupLoadingSpec
            ", lookupsToLoad=" + lookupsToLoad +
            '}';
   }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    LookupLoadingSpec that = (LookupLoadingSpec) o;
+    return mode == that.mode && Objects.equals(lookupsToLoad, 
that.lookupsToLoad);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(mode, lookupsToLoad);
+  }
 }
diff --git 
a/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java
 
b/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java
index 8d0a7a5518a..d36aff6914c 100644
--- 
a/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/lookup/cache/LookupLoadingSpecTest.java
@@ -19,13 +19,20 @@
 
 package org.apache.druid.server.lookup.cache;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import junitparams.JUnitParamsRunner;
+import junitparams.Parameters;
 import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.StringUtils;
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.runner.RunWith;
 
+import java.util.Arrays;
 import java.util.Set;
 
+@RunWith(JUnitParamsRunner.class)
 public class LookupLoadingSpecTest
 {
   @Test
@@ -59,4 +66,121 @@ public class LookupLoadingSpecTest
     DruidException exception = Assert.assertThrows(DruidException.class, () -> 
LookupLoadingSpec.loadOnly(null));
     Assert.assertEquals("Expected non-null set of lookups to load.", 
exception.getMessage());
   }
+
+  @Test
+  public void testCreateLookupLoadingSpecFromEmptyContext()
+  {
+    // Default spec is returned in the case of context not having the lookup 
keys.
+    Assert.assertEquals(
+        LookupLoadingSpec.ALL,
+        LookupLoadingSpec.createFromContext(
+            ImmutableMap.of(),
+            LookupLoadingSpec.ALL
+        )
+    );
+
+    Assert.assertEquals(
+        LookupLoadingSpec.NONE,
+        LookupLoadingSpec.createFromContext(
+            ImmutableMap.of(),
+            LookupLoadingSpec.NONE
+        )
+    );
+  }
+
+  @Test
+  public void testCreateLookupLoadingSpecFromNullContext()
+  {
+    // Default spec is returned in the case of context=null.
+    Assert.assertEquals(
+        LookupLoadingSpec.NONE,
+        LookupLoadingSpec.createFromContext(
+            null,
+            LookupLoadingSpec.NONE
+        )
+    );
+
+    Assert.assertEquals(
+        LookupLoadingSpec.ALL,
+        LookupLoadingSpec.createFromContext(
+            null,
+            LookupLoadingSpec.ALL
+        )
+    );
+  }
+
+  @Test
+  public void testCreateLookupLoadingSpecFromContext()
+  {
+    // Only required lookups are returned in the case of context having the 
lookup keys.
+    Assert.assertEquals(
+        LookupLoadingSpec.loadOnly(ImmutableSet.of("lookup1", "lookup2")),
+        LookupLoadingSpec.createFromContext(
+            ImmutableMap.of(
+                LookupLoadingSpec.CTX_LOOKUPS_TO_LOAD, 
Arrays.asList("lookup1", "lookup2"),
+                LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, 
LookupLoadingSpec.Mode.ONLY_REQUIRED
+            ),
+            LookupLoadingSpec.ALL
+        )
+    );
+
+    // No lookups are returned in the case of context having mode=NONE, 
irrespective of the default spec.
+    Assert.assertEquals(
+        LookupLoadingSpec.NONE,
+        LookupLoadingSpec.createFromContext(
+            ImmutableMap.of(
+                LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, 
LookupLoadingSpec.Mode.NONE),
+            LookupLoadingSpec.ALL
+        )
+    );
+
+    // All lookups are returned in the case of context having mode=ALL, 
irrespective of the default spec.
+    Assert.assertEquals(
+        LookupLoadingSpec.ALL,
+        LookupLoadingSpec.createFromContext(
+            ImmutableMap.of(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, 
LookupLoadingSpec.Mode.ALL),
+            LookupLoadingSpec.NONE
+        )
+    );
+  }
+
+  @Test
+  @Parameters(
+      {
+          "NONE1",
+          "A",
+          "Random mode",
+          "all",
+          "only required",
+          "none"
+      }
+  )
+  public void testCreateLookupLoadingSpecFromInvalidModeInContext(String mode)
+  {
+    final DruidException exception = Assert.assertThrows(DruidException.class, 
() -> LookupLoadingSpec.createFromContext(
+        ImmutableMap.of(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, mode), 
LookupLoadingSpec.ALL));
+    Assert.assertEquals(StringUtils.format("Invalid value of %s[%s]. Allowed 
values are [ALL, NONE, ONLY_REQUIRED]",
+                                           
LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, mode), exception.getMessage());
+  }
+
+  @Test
+  @Parameters(
+      {
+          "foo bar",
+          "foo]"
+      }
+  )
+  public void testCreateLookupLoadingSpecFromInvalidLookupsInContext(Object 
lookupsToLoad)
+  {
+    final DruidException exception = Assert.assertThrows(DruidException.class, 
() ->
+        LookupLoadingSpec.createFromContext(
+            ImmutableMap.of(
+                LookupLoadingSpec.CTX_LOOKUPS_TO_LOAD, lookupsToLoad,
+                LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE, 
LookupLoadingSpec.Mode.ONLY_REQUIRED),
+            LookupLoadingSpec.ALL)
+    );
+    Assert.assertEquals(StringUtils.format("Invalid value of %s[%s]. Please 
provide a comma-separated list of "
+                                           + "lookup names. For example: 
[\"lookupName1\", \"lookupName2\"]",
+                                           
LookupLoadingSpec.CTX_LOOKUPS_TO_LOAD, lookupsToLoad), exception.getMessage());
+  }
 }
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java 
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java
index 99f721bffaa..b7e2de3e66b 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java
@@ -80,8 +80,6 @@ public class PlannerContext
   public static final String CTX_SQL_CURRENT_TIMESTAMP = "sqlCurrentTimestamp";
   public static final String CTX_SQL_TIME_ZONE = "sqlTimeZone";
   public static final String CTX_SQL_JOIN_ALGORITHM = "sqlJoinAlgorithm";
-  public static final String CTX_LOOKUP_LOADING_MODE = "lookupLoadingMode";
-  public static final String CTX_LOOKUPS_TO_LOAD = "lookupsToLoad";
   private static final JoinAlgorithm DEFAULT_SQL_JOIN_ALGORITHM = 
JoinAlgorithm.BROADCAST;
 
   /**
@@ -357,7 +355,7 @@ public class PlannerContext
   }
 
   /**
-   * Returns the lookup to load for a given task.
+   * Lookup loading spec used if this context corresponds to an MSQ task.
    */
   public LookupLoadingSpec getLookupLoadingSpec()
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to