This is an automated email from the ASF dual-hosted git repository.

abhishekrb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ef94c9deeb Add support for selective loading of broadcast datasources 
in the task layer (#17027)
5ef94c9deeb is described below

commit 5ef94c9deebea6cc52dcc504b83f451c73d5c036
Author: Abhishek Radhakrishnan <[email protected]>
AuthorDate: Thu Sep 12 13:30:28 2024 -0400

    Add support for selective loading of broadcast datasources in the task 
layer (#17027)
    
    Tasks control the loading of broadcast datasources via 
BroadcastDatasourceLoadingSpec getBroadcastDatasourceLoadingSpec(). By default, 
tasks download all broadcast datasources, unless there's an override as with 
kill and MSQ controller task.
    
    The CLIPeon command line option --loadBroadcastSegments is deprecated in 
favor of --loadBroadcastDatasourceMode.
    
    Broadcast datasources can be specified in SQL queries through JOIN and FROM 
clauses, or obtained from other sources such as lookups.To this effect, we have 
introduced a BroadcastDatasourceLoadingSpec. Finding the set of broadcast 
datasources during SQL planning will be done in a follow-up, which will apply 
only to MSQ tasks, so they load only required broadcast datasources. This PR 
primarily focuses on the skeletal changes around BroadcastDatasourceLoadingSpec 
and integrating it from  [...]
    
    Currently, only kill tasks and MSQ controller tasks skip loading broadcast 
datasources.
---
 .../k8s/overlord/common/DruidK8sConstants.java     |   1 +
 .../k8s/overlord/taskadapter/K8sTaskAdapter.java   |   6 +-
 .../taskadapter/PodTemplateTaskAdapter.java        |   4 +
 .../taskadapter/PodTemplateTaskAdapterTest.java    |  43 +++++-
 .../src/test/resources/expectedNoopJob.yaml        |   2 +
 .../src/test/resources/expectedNoopJobBase.yaml    |   2 +
 .../src/test/resources/expectedNoopJobLongIds.yaml |   2 +
 .../test/resources/expectedNoopJobNoTaskJson.yaml  |   2 +
 .../resources/expectedNoopJobTlsEnabledBase.yaml   |   2 +
 .../druid/msq/indexing/MSQControllerTask.java      |   7 +
 .../druid/msq/indexing/MSQControllerTaskTest.java  |  17 +++
 .../common/task/KillUnusedSegmentsTask.java        |   7 +
 .../apache/druid/indexing/common/task/Task.java    |  13 +-
 .../druid/indexing/overlord/ForkingTaskRunner.java |   6 +-
 .../common/task/KillUnusedSegmentsTaskTest.java    |  11 ++
 .../BroadcastDatasourceLoadingSpec.java            | 170 +++++++++++++++++++++
 .../server/coordination/SegmentBootstrapper.java   |  32 +++-
 .../server/metrics/DataSourceTaskIdHolder.java     |  15 +-
 .../BroadcastDatasourceLoadingSpecTest.java        | 166 ++++++++++++++++++++
 .../coordination/SegmentBootstrapperCacheTest.java |  10 +-
 .../coordination/SegmentBootstrapperTest.java      | 145 +++++++++++++++++-
 .../main/java/org/apache/druid/cli/CliPeon.java    |  31 +++-
 22 files changed, 673 insertions(+), 21 deletions(-)

diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidK8sConstants.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidK8sConstants.java
index 568f8ed5a11..644a7f109b2 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidK8sConstants.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidK8sConstants.java
@@ -37,6 +37,7 @@ public class DruidK8sConstants
   public static final String TASK_JSON_ENV = "TASK_JSON";
   public static final String TASK_DIR_ENV = "TASK_DIR";
   public static final String TASK_ID_ENV = "TASK_ID";
+  public static final String LOAD_BROADCAST_DATASOURCE_MODE_ENV = 
"LOAD_BROADCAST_DATASOURCE_MODE";
   public static final String LOAD_BROADCAST_SEGMENTS_ENV = 
"LOAD_BROADCAST_SEGMENTS";
   public static final String JAVA_OPTS = "JAVA_OPTS";
   public static final String DRUID_HOST_ENV = "druid_host";
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java
index c15698803d9..cc689f925f4 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java
@@ -444,12 +444,16 @@ public abstract class K8sTaskAdapter implements 
TaskAdapter
     }
 
     // If the task type is queryable, we need to load broadcast segments on 
the peon, used for
-    // join queries
+    // join queries. This is replaced by --loadBroadcastDatasourceMode option, 
but is preserved here
+    // for backwards compatibility and can be removed in a future release.
     if (task.supportsQueries()) {
       command.add("--loadBroadcastSegments");
       command.add("true");
     }
 
+    command.add("--loadBroadcastDatasourceMode");
+    command.add(task.getBroadcastDatasourceLoadingSpec().getMode().toString());
+
     command.add("--taskId");
     command.add(task.getId());
     log.info(
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java
index e8aaf1bbab3..321fe3fcb3e 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java
@@ -280,6 +280,10 @@ public class PodTemplateTaskAdapter implements TaskAdapter
             .withName(DruidK8sConstants.TASK_ID_ENV)
             .withValue(task.getId())
             .build(),
+        new EnvVarBuilder()
+            .withName(DruidK8sConstants.LOAD_BROADCAST_DATASOURCE_MODE_ENV)
+            
.withValue(task.getBroadcastDatasourceLoadingSpec().getMode().toString())
+            .build(),
         new EnvVarBuilder()
             .withName(DruidK8sConstants.LOAD_BROADCAST_SEGMENTS_ENV)
             .withValue(Boolean.toString(task.supportsQueries()))
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java
index ac2aaa70558..b25f23a25dd 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java
@@ -46,6 +46,7 @@ import 
org.apache.druid.k8s.overlord.execution.KubernetesTaskRunnerDynamicConfig
 import org.apache.druid.k8s.overlord.execution.Selector;
 import 
org.apache.druid.k8s.overlord.execution.SelectorBasedPodTemplateSelectStrategy;
 import org.apache.druid.server.DruidNode;
+import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec;
 import org.apache.druid.tasklogs.TaskLogs;
 import org.easymock.EasyMock;
 import org.junit.Assert;
@@ -537,6 +538,7 @@ public class PodTemplateTaskAdapterTest
     EasyMock.expect(task.getId()).andReturn("id").anyTimes();
     EasyMock.expect(task.getGroupId()).andReturn("groupid").anyTimes();
     EasyMock.expect(task.getDataSource()).andReturn("datasource").anyTimes();
+    
EasyMock.expect(task.getBroadcastDatasourceLoadingSpec()).andReturn(BroadcastDatasourceLoadingSpec.ALL).anyTimes();
 
     EasyMock.replay(task);
     Job actual = adapter.fromTask(task);
@@ -550,7 +552,46 @@ public class PodTemplateTaskAdapterTest
   }
 
   @Test
-  public void test_fromTask_withIndexKafkaPodTemplateInRuntimeProperites() 
throws IOException
+  public void test_fromTask_withBroadcastDatasourceLoadingModeAll() throws 
IOException
+  {
+    Path templatePath = Files.createFile(tempDir.resolve("noop.yaml"));
+    mapper.writeValue(templatePath.toFile(), podTemplateSpec);
+
+    Properties props = new Properties();
+    props.setProperty("druid.indexer.runner.k8s.podTemplate.base", 
templatePath.toString());
+    props.setProperty("druid.indexer.runner.k8s.podTemplate.queryable", 
templatePath.toString());
+
+    PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter(
+        taskRunnerConfig,
+        taskConfig,
+        node,
+        mapper,
+        props,
+        taskLogs,
+        dynamicConfigRef
+    );
+
+    Task task = EasyMock.mock(Task.class);
+    EasyMock.expect(task.supportsQueries()).andReturn(true);
+    EasyMock.expect(task.getType()).andReturn("queryable").anyTimes();
+    EasyMock.expect(task.getId()).andReturn("id").anyTimes();
+    EasyMock.expect(task.getGroupId()).andReturn("groupid").anyTimes();
+    EasyMock.expect(task.getDataSource()).andReturn("datasource").anyTimes();
+    
EasyMock.expect(task.getBroadcastDatasourceLoadingSpec()).andReturn(BroadcastDatasourceLoadingSpec.ALL).anyTimes();
+
+    EasyMock.replay(task);
+    Job actual = adapter.fromTask(task);
+    EasyMock.verify(task);
+
+    
Assertions.assertEquals(BroadcastDatasourceLoadingSpec.Mode.ALL.toString(), 
actual.getSpec().getTemplate()
+                                          .getSpec().getContainers()
+                                          .get(0).getEnv().stream()
+                                          .filter(env -> 
env.getName().equals(DruidK8sConstants.LOAD_BROADCAST_DATASOURCE_MODE_ENV))
+                                          
.collect(Collectors.toList()).get(0).getValue());
+  }
+
+  @Test
+  public void test_fromTask_withIndexKafkaPodTemplateInRuntimeProperties() 
throws IOException
   {
     Path baseTemplatePath = Files.createFile(tempDir.resolve("base.yaml"));
     mapper.writeValue(baseTemplatePath.toFile(), podTemplateSpec);
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml
index ddae7c0567f..ac539c5da15 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml
@@ -45,6 +45,8 @@ spec:
               value: "/tmp"
             - name: "TASK_ID"
               value: "id"
+            - name: "LOAD_BROADCAST_DATASOURCE_MODE"
+              value: "ALL"
             - name: "LOAD_BROADCAST_SEGMENTS"
               value: "false"
             - name: "TASK_JSON"
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobBase.yaml
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobBase.yaml
index 532c3dd53e8..f7c2ff958bb 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobBase.yaml
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobBase.yaml
@@ -45,6 +45,8 @@ spec:
               value: "/tmp"
             - name: "TASK_ID"
               value: "id"
+            - name: "LOAD_BROADCAST_DATASOURCE_MODE"
+              value: "ALL"
             - name: "LOAD_BROADCAST_SEGMENTS"
               value: "false"
             - name: "TASK_JSON"
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml
index d6c316dcdde..3a3af1528b5 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml
@@ -44,6 +44,8 @@ spec:
               value: "/tmp"
             - name: "TASK_ID"
               value: 
"api-issued_kill_wikipedia3_omjobnbc_1000-01-01T00:00:00.000Z_2023-05-14T00:00:00.000Z_2023-05-15T17:03:01.220Z"
+            - name: "LOAD_BROADCAST_DATASOURCE_MODE"
+              value: "ALL"
             - name: "LOAD_BROADCAST_SEGMENTS"
               value: "false"
             - name: "TASK_JSON"
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobNoTaskJson.yaml
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobNoTaskJson.yaml
index 90ae9970959..ec7f9a06248 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobNoTaskJson.yaml
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobNoTaskJson.yaml
@@ -43,6 +43,8 @@ spec:
               value: "/tmp"
             - name: "TASK_ID"
               value: "id"
+            - name: "LOAD_BROADCAST_DATASOURCE_MODE"
+              value: "ALL"
             - name: "LOAD_BROADCAST_SEGMENTS"
               value: "false"
           image: one
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabledBase.yaml
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabledBase.yaml
index 0e52beac9e3..84457fb3175 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabledBase.yaml
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabledBase.yaml
@@ -44,6 +44,8 @@ spec:
               value: "/tmp"
             - name: "TASK_ID"
               value: "id"
+            - name: "LOAD_BROADCAST_DATASOURCE_MODE"
+              value: "ALL"
             - name: "LOAD_BROADCAST_SEGMENTS"
               value: "false"
             - name: "TASK_JSON"
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
index c3f6feaab24..4ddc8274b9d 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
@@ -59,6 +59,7 @@ import org.apache.druid.rpc.ServiceClientFactory;
 import org.apache.druid.rpc.StandardRetryPolicy;
 import org.apache.druid.rpc.indexing.OverlordClient;
 import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec;
 import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
 import org.apache.druid.server.security.Resource;
 import org.apache.druid.server.security.ResourceAction;
@@ -374,4 +375,10 @@ public class MSQControllerTask extends AbstractTask 
implements ClientTaskQuery,
   {
     return LookupLoadingSpec.NONE;
   }
+
+  @Override
+  public BroadcastDatasourceLoadingSpec getBroadcastDatasourceLoadingSpec()
+  {
+    return BroadcastDatasourceLoadingSpec.NONE;
+  }
 }
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java
index 76586c1e108..8d974285fb5 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java
@@ -35,6 +35,7 @@ import 
org.apache.druid.msq.indexing.destination.DataSourceMSQDestination;
 import org.apache.druid.query.Druids;
 import org.apache.druid.query.scan.ScanQuery;
 import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
+import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec;
 import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
 import org.apache.druid.sql.calcite.planner.ColumnMapping;
 import org.apache.druid.sql.calcite.planner.ColumnMappings;
@@ -104,6 +105,22 @@ public class MSQControllerTaskTest
     Assert.assertEquals(LookupLoadingSpec.NONE, 
controllerTask.getLookupLoadingSpec());
   }
 
+  @Test
+  public void testGetDefaultBroadcastDatasourceLoadingSpec()
+  {
+    MSQControllerTask controllerTask = new MSQControllerTask(
+        null,
+        MSQ_SPEC,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null
+    );
+    Assert.assertEquals(BroadcastDatasourceLoadingSpec.NONE, 
controllerTask.getBroadcastDatasourceLoadingSpec());
+  }
+
   @Test
   public void testGetLookupLoadingSpecUsingLookupLoadingInfoInContext()
   {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
index e1f6d2915ee..06082a988d9 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
@@ -47,6 +47,7 @@ import org.apache.druid.indexing.overlord.Segments;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec;
 import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
 import org.apache.druid.server.security.ResourceAction;
 import org.apache.druid.timeline.DataSegment;
@@ -412,6 +413,12 @@ public class KillUnusedSegmentsTask extends 
AbstractFixedIntervalTask
     return LookupLoadingSpec.NONE;
   }
 
+  @Override
+  public BroadcastDatasourceLoadingSpec getBroadcastDatasourceLoadingSpec()
+  {
+    return BroadcastDatasourceLoadingSpec.NONE;
+  }
+
   @Override
   public boolean isReady(TaskActionClient taskActionClient) throws Exception
   {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
index 003b39e606b..cacdc47f520 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
@@ -41,13 +41,13 @@ import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.UOE;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryRunner;
+import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec;
 import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
 import org.apache.druid.server.security.Resource;
 import org.apache.druid.server.security.ResourceAction;
 import org.apache.druid.server.security.ResourceType;
 
 import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -329,9 +329,18 @@ public interface Task
    * This behaviour can be overridden by passing parameters {@link 
LookupLoadingSpec#CTX_LOOKUP_LOADING_MODE}
    * and {@link LookupLoadingSpec#CTX_LOOKUPS_TO_LOAD} in the task context.
    */
-  @Nullable
   default LookupLoadingSpec getLookupLoadingSpec()
   {
     return LookupLoadingSpec.createFromContext(getContext(), 
LookupLoadingSpec.ALL);
   }
+
+  /**
+   * Specifies the list of broadcast datasources to load for this task. Tasks 
load ALL broadcast datasources by default.
+   * This behavior can be overridden by passing parameters {@link 
BroadcastDatasourceLoadingSpec#CTX_BROADCAST_DATASOURCE_LOADING_MODE}
+   * and {@link 
BroadcastDatasourceLoadingSpec#CTX_BROADCAST_DATASOURCES_TO_LOAD} in the task 
context.
+   */
+  default BroadcastDatasourceLoadingSpec getBroadcastDatasourceLoadingSpec()
+  {
+    return BroadcastDatasourceLoadingSpec.createFromContext(getContext(), 
BroadcastDatasourceLoadingSpec.ALL);
+  }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
index c676877c110..7e4dd6c39c2 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
@@ -376,12 +376,16 @@ public class ForkingTaskRunner
                         }
 
                         // If the task type is queryable, we need to load 
broadcast segments on the peon, used for
-                        // join queries
+                        // join queries. This is replaced by 
--loadBroadcastDatasourceMode option, but is preserved here
+                        // for backwards compatibility and can be removed in a 
future release.
                         if (task.supportsQueries()) {
                           command.add("--loadBroadcastSegments");
                           command.add("true");
                         }
 
+                        command.add("--loadBroadcastDatasourceMode");
+                        
command.add(task.getBroadcastDatasourceLoadingSpec().getMode().toString());
+
                         if (!taskFile.exists()) {
                           jsonMapper.writeValue(taskFile, task);
                         }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
index fe2b5a51c86..7a4df9de36c 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
@@ -37,6 +37,7 @@ import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.JodaUtils;
 import org.apache.druid.metadata.IndexerSqlMetadataStorageCoordinatorTestBase;
+import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec;
 import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
 import org.apache.druid.timeline.DataSegment;
 import org.assertj.core.api.Assertions;
@@ -601,6 +602,16 @@ public class KillUnusedSegmentsTaskTest extends 
IngestionTestBase
     Assert.assertEquals(LookupLoadingSpec.Mode.NONE, 
task.getLookupLoadingSpec().getMode());
   }
 
+  @Test
+  public void testGetBroadcastDatasourcesToLoad()
+  {
+    final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder()
+        .dataSource(DATA_SOURCE)
+        .interval(Intervals.of("2019-03-01/2019-04-01"))
+        .build();
+    Assert.assertEquals(BroadcastDatasourceLoadingSpec.Mode.NONE, 
task.getBroadcastDatasourceLoadingSpec().getMode());
+  }
+
   @Test
   public void testKillBatchSizeOneAndLimit4() throws Exception
   {
diff --git 
a/server/src/main/java/org/apache/druid/server/coordination/BroadcastDatasourceLoadingSpec.java
 
b/server/src/main/java/org/apache/druid/server/coordination/BroadcastDatasourceLoadingSpec.java
new file mode 100644
index 00000000000..3a11027311e
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/server/coordination/BroadcastDatasourceLoadingSpec.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordination;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.error.InvalidInput;
+
+import javax.annotation.Nullable;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * This class defines the spec for loading of broadcast datasources for a 
given task. It contains 2 fields:
+ * <ol>
+ *   <li>{@link BroadcastDatasourceLoadingSpec#mode}: This mode defines 
whether broadcastDatasources need to be
+ *   loaded for the given task, or not. It can take 3 values: </li>
+ *   <ul>
+ *    <li> ALL: Load all the broadcast datasources.</li>
+ *    <li> NONE: Load no broadcast datasources. </li>
+ *    <li> ONLY_REQUIRED: Load only the broadcast datasources defined in 
broadcastDatasourcesToLoad </li>
+ *   </ul>
+ * <li>{@link BroadcastDatasourceLoadingSpec#broadcastDatasourcesToLoad}: 
Defines the broadcastDatasources to load when the 
broadcastDatasourceLoadingMode is set to ONLY_REQUIRED.</li>
+ * </ol>
+ */
+public class BroadcastDatasourceLoadingSpec
+{
+
+  public static final String CTX_BROADCAST_DATASOURCE_LOADING_MODE = 
"broadcastDatasourceLoadingMode";
+  public static final String CTX_BROADCAST_DATASOURCES_TO_LOAD = 
"broadcastDatasourcesToLoad";
+
+  public enum Mode
+  {
+    ALL, NONE, ONLY_REQUIRED
+  }
+
+  private final Mode mode;
+  @Nullable
+  private final ImmutableSet<String> broadcastDatasourcesToLoad;
+
+  public static final BroadcastDatasourceLoadingSpec ALL = new 
BroadcastDatasourceLoadingSpec(Mode.ALL, null);
+  public static final BroadcastDatasourceLoadingSpec NONE = new 
BroadcastDatasourceLoadingSpec(Mode.NONE, null);
+
+  private BroadcastDatasourceLoadingSpec(Mode mode, @Nullable Set<String> 
broadcastDatasourcesToLoad)
+  {
+    this.mode = mode;
+    this.broadcastDatasourcesToLoad = broadcastDatasourcesToLoad == null ? 
null : ImmutableSet.copyOf(broadcastDatasourcesToLoad);
+  }
+
+  /**
+   * Creates a BroadcastSegmentLoadingSpec which loads only the broadcast 
datasources present in the given set.
+   */
+  public static BroadcastDatasourceLoadingSpec loadOnly(Set<String> 
broadcastDatasourcesToLoad)
+  {
+    if (broadcastDatasourcesToLoad == null) {
+      throw InvalidInput.exception("Expected non-null set of broadcast 
datasources to load.");
+    }
+    return new BroadcastDatasourceLoadingSpec(Mode.ONLY_REQUIRED, 
broadcastDatasourcesToLoad);
+  }
+
+  public Mode getMode()
+  {
+    return mode;
+  }
+
+  /**
+   * @return A non-null immutable set of broadcast datasource names when 
{@link BroadcastDatasourceLoadingSpec#mode} is ONLY_REQUIRED, null otherwise.
+   */
+  public ImmutableSet<String> getBroadcastDatasourcesToLoad()
+  {
+    return broadcastDatasourcesToLoad;
+  }
+
+  public static BroadcastDatasourceLoadingSpec createFromContext(Map<String, 
Object> context, BroadcastDatasourceLoadingSpec defaultSpec)
+  {
+    if (context == null) {
+      return defaultSpec;
+    }
+
+    final Object broadcastDatasourceModeValue = 
context.get(CTX_BROADCAST_DATASOURCE_LOADING_MODE);
+    if (broadcastDatasourceModeValue == null) {
+      return defaultSpec;
+    }
+
+    final BroadcastDatasourceLoadingSpec.Mode broadcastDatasourceLoadingMode;
+    try {
+      broadcastDatasourceLoadingMode = 
BroadcastDatasourceLoadingSpec.Mode.valueOf(broadcastDatasourceModeValue.toString());
+    }
+    catch (IllegalArgumentException e) {
+      throw InvalidInput.exception(
+          "Invalid value of %s[%s]. Allowed values are %s",
+          CTX_BROADCAST_DATASOURCE_LOADING_MODE, 
broadcastDatasourceModeValue.toString(),
+          Arrays.asList(BroadcastDatasourceLoadingSpec.Mode.values())
+      );
+    }
+
+    if (broadcastDatasourceLoadingMode == Mode.NONE) {
+      return NONE;
+    } else if (broadcastDatasourceLoadingMode == Mode.ALL) {
+      return ALL;
+    } else if (broadcastDatasourceLoadingMode == Mode.ONLY_REQUIRED) {
+      final Collection<String> broadcastDatasourcesToLoad;
+      try {
+        broadcastDatasourcesToLoad = (Collection<String>) 
context.get(CTX_BROADCAST_DATASOURCES_TO_LOAD);
+      }
+      catch (ClassCastException e) {
+        throw InvalidInput.exception(
+            "Invalid value of %s[%s]. Please provide a comma-separated list of 
broadcast datasource names."
+            + " For example: [\"datasourceName1\", \"datasourceName2\"]",
+            CTX_BROADCAST_DATASOURCES_TO_LOAD, 
context.get(CTX_BROADCAST_DATASOURCES_TO_LOAD)
+        );
+      }
+
+      if (broadcastDatasourcesToLoad == null || 
broadcastDatasourcesToLoad.isEmpty()) {
+        throw InvalidInput.exception("Set of broadcast datasources to load 
cannot be %s for mode[ONLY_REQUIRED].", broadcastDatasourcesToLoad);
+      }
+      return BroadcastDatasourceLoadingSpec.loadOnly(new 
HashSet<>(broadcastDatasourcesToLoad));
+    } else {
+      return defaultSpec;
+    }
+  }
+
+  @Override
+  public String toString()
+  {
+    return "BroadcastDatasourceLoadingSpec{" +
+           "mode=" + mode +
+           ", broadcastDatasourcesToLoad=" + broadcastDatasourcesToLoad +
+           '}';
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    BroadcastDatasourceLoadingSpec that = (BroadcastDatasourceLoadingSpec) o;
+    return mode == that.mode && Objects.equals(broadcastDatasourcesToLoad, 
that.broadcastDatasourcesToLoad);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(mode, broadcastDatasourcesToLoad);
+  }
+}
diff --git 
a/server/src/main/java/org/apache/druid/server/coordination/SegmentBootstrapper.java
 
b/server/src/main/java/org/apache/druid/server/coordination/SegmentBootstrapper.java
index c5b71fbcddc..7eec82e80b1 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordination/SegmentBootstrapper.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordination/SegmentBootstrapper.java
@@ -39,12 +39,14 @@ import 
org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.segment.loading.SegmentLoaderConfig;
 import org.apache.druid.segment.loading.SegmentLoadingException;
 import org.apache.druid.server.SegmentManager;
+import org.apache.druid.server.metrics.DataSourceTaskIdHolder;
 import org.apache.druid.timeline.DataSegment;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
@@ -80,6 +82,8 @@ public class SegmentBootstrapper
 
   private static final EmittingLogger log = new 
EmittingLogger(SegmentBootstrapper.class);
 
+  private final DataSourceTaskIdHolder datasourceHolder;
+
   @Inject
   public SegmentBootstrapper(
       SegmentLoadDropHandler loadDropHandler,
@@ -89,7 +93,8 @@ public class SegmentBootstrapper
       SegmentManager segmentManager,
       ServerTypeConfig serverTypeConfig,
       CoordinatorClient coordinatorClient,
-      ServiceEmitter emitter
+      ServiceEmitter emitter,
+      DataSourceTaskIdHolder datasourceHolder
   )
   {
     this.loadDropHandler = loadDropHandler;
@@ -100,6 +105,7 @@ public class SegmentBootstrapper
     this.serverTypeConfig = serverTypeConfig;
     this.coordinatorClient = coordinatorClient;
     this.emitter = emitter;
+    this.datasourceHolder = datasourceHolder;
   }
 
   @LifecycleStart
@@ -261,10 +267,18 @@ public class SegmentBootstrapper
 
   /**
    * @return a list of bootstrap segments. When bootstrap segments cannot be 
found, an empty list is returned.
+   * The bootstrap segments returned are filtered by the broadcast datasources 
indicated by {@link DataSourceTaskIdHolder#getBroadcastDatasourceLoadingSpec()}
+   * if applicable.
    */
   private List<DataSegment> getBootstrapSegments()
   {
-    log.info("Fetching bootstrap segments from the coordinator.");
+    final BroadcastDatasourceLoadingSpec.Mode mode = 
datasourceHolder.getBroadcastDatasourceLoadingSpec().getMode();
+    if (mode == BroadcastDatasourceLoadingSpec.Mode.NONE) {
+      log.info("Skipping fetch of bootstrap segments.");
+      return ImmutableList.of();
+    }
+
+    log.info("Fetching bootstrap segments from the coordinator with 
BroadcastDatasourceLoadingSpec mode[%s].", mode);
     final Stopwatch stopwatch = Stopwatch.createStarted();
 
     List<DataSegment> bootstrapSegments = new ArrayList<>();
@@ -272,7 +286,18 @@ public class SegmentBootstrapper
     try {
       final BootstrapSegmentsResponse response =
           FutureUtils.getUnchecked(coordinatorClient.fetchBootstrapSegments(), 
true);
-      bootstrapSegments = ImmutableList.copyOf(response.getIterator());
+      if (mode == BroadcastDatasourceLoadingSpec.Mode.ONLY_REQUIRED) {
+        final Set<String> broadcastDatasourcesToLoad = 
datasourceHolder.getBroadcastDatasourceLoadingSpec().getBroadcastDatasourcesToLoad();
+        final List<DataSegment> filteredBroadcast = new ArrayList<>();
+        response.getIterator().forEachRemaining(segment -> {
+          if (broadcastDatasourcesToLoad.contains(segment.getDataSource())) {
+            filteredBroadcast.add(segment);
+          }
+        });
+        bootstrapSegments = filteredBroadcast;
+      } else {
+        bootstrapSegments = ImmutableList.copyOf(response.getIterator());
+      }
     }
     catch (Exception e) {
       log.warn("Error fetching bootstrap segments from the coordinator: [%s]. 
", e.getMessage());
@@ -284,7 +309,6 @@ public class SegmentBootstrapper
       emitter.emit(new 
ServiceMetricEvent.Builder().setMetric("segment/bootstrap/count", 
bootstrapSegments.size()));
       log.info("Fetched [%d] bootstrap segments in [%d]ms.", 
bootstrapSegments.size(), fetchRunMillis);
     }
-
     return bootstrapSegments;
   }
 
diff --git 
a/server/src/main/java/org/apache/druid/server/metrics/DataSourceTaskIdHolder.java
 
b/server/src/main/java/org/apache/druid/server/metrics/DataSourceTaskIdHolder.java
index 6d2dafd31a5..87002a5157f 100644
--- 
a/server/src/main/java/org/apache/druid/server/metrics/DataSourceTaskIdHolder.java
+++ 
b/server/src/main/java/org/apache/druid/server/metrics/DataSourceTaskIdHolder.java
@@ -21,15 +21,16 @@ package org.apache.druid.server.metrics;
 
 import com.google.inject.Inject;
 import com.google.inject.name.Named;
+import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec;
 import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
 
-import javax.annotation.Nullable;
-
 public class DataSourceTaskIdHolder
 {
   public static final String DATA_SOURCE_BINDING = "druidDataSource";
   public static final String TASK_ID_BINDING = "druidTaskId";
   public static final String LOOKUPS_TO_LOAD_FOR_TASK = "lookupsToLoadForTask";
+  public static final String BROADCAST_DATASOURCES_TO_LOAD_FOR_TASK = 
"broadcastDatasourcesToLoadForTask";
+
   @Named(DATA_SOURCE_BINDING)
   @Inject(optional = true)
   String dataSource = null;
@@ -37,11 +38,14 @@ public class DataSourceTaskIdHolder
   @Inject(optional = true)
   String taskId = null;
 
-  @Nullable
   @Named(LOOKUPS_TO_LOAD_FOR_TASK)
   @Inject(optional = true)
   LookupLoadingSpec lookupLoadingSpec = LookupLoadingSpec.ALL;
 
+  @Named(BROADCAST_DATASOURCES_TO_LOAD_FOR_TASK)
+  @Inject(optional = true)
+  BroadcastDatasourceLoadingSpec broadcastDatasourceLoadingSpec = 
BroadcastDatasourceLoadingSpec.ALL;
+
   public String getDataSource()
   {
     return dataSource;
@@ -56,4 +60,9 @@ public class DataSourceTaskIdHolder
   {
     return lookupLoadingSpec;
   }
+
+  public BroadcastDatasourceLoadingSpec getBroadcastDatasourceLoadingSpec()
+  {
+    return broadcastDatasourceLoadingSpec;
+  }
 }
diff --git 
a/server/src/test/java/org/apache/druid/server/coordination/BroadcastDatasourceLoadingSpecTest.java
 
b/server/src/test/java/org/apache/druid/server/coordination/BroadcastDatasourceLoadingSpecTest.java
new file mode 100644
index 00000000000..ddec0901965
--- /dev/null
+++ 
b/server/src/test/java/org/apache/druid/server/coordination/BroadcastDatasourceLoadingSpecTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordination;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import junitparams.JUnitParamsRunner;
+import junitparams.Parameters;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.StringUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import java.util.Arrays;
+import java.util.Set;
+
+@RunWith(JUnitParamsRunner.class)
+public class BroadcastDatasourceLoadingSpecTest
+{
+  @Test
+  public void testLoadingAllBroadcastDatasources()
+  {
+    final BroadcastDatasourceLoadingSpec spec = 
BroadcastDatasourceLoadingSpec.ALL;
+    Assert.assertEquals(BroadcastDatasourceLoadingSpec.Mode.ALL, 
spec.getMode());
+    Assert.assertNull(spec.getBroadcastDatasourcesToLoad());
+  }
+
+  @Test
+  public void testLoadingNoLookups()
+  {
+    final BroadcastDatasourceLoadingSpec spec = 
BroadcastDatasourceLoadingSpec.NONE;
+    Assert.assertEquals(BroadcastDatasourceLoadingSpec.Mode.NONE, 
spec.getMode());
+    Assert.assertNull(spec.getBroadcastDatasourcesToLoad());
+  }
+
+  @Test
+  public void testLoadingOnlyRequiredLookups()
+  {
+    final Set<String> broadcastDatasourcesToLoad = ImmutableSet.of("ds1", 
"ds2");
+    final BroadcastDatasourceLoadingSpec spec = 
BroadcastDatasourceLoadingSpec.loadOnly(ImmutableSet.of("ds1", "ds2"));
+    Assert.assertEquals(BroadcastDatasourceLoadingSpec.Mode.ONLY_REQUIRED, 
spec.getMode());
+    Assert.assertEquals(broadcastDatasourcesToLoad, 
spec.getBroadcastDatasourcesToLoad());
+  }
+
+  @Test
+  public void testLoadingOnlyRequiredLookupsWithNullList()
+  {
+    DruidException exception = Assert.assertThrows(DruidException.class, () -> 
BroadcastDatasourceLoadingSpec.loadOnly(null));
+    Assert.assertEquals("Expected non-null set of broadcast datasources to 
load.", exception.getMessage());
+  }
+
+  @Test
+  public void testCreateBroadcastLoadingSpecFromNullContext()
+  {
+    // Default spec is returned in the case of context=null.
+    Assert.assertEquals(
+        BroadcastDatasourceLoadingSpec.NONE,
+        BroadcastDatasourceLoadingSpec.createFromContext(
+            null,
+            BroadcastDatasourceLoadingSpec.NONE
+        )
+    );
+
+    Assert.assertEquals(
+        BroadcastDatasourceLoadingSpec.ALL,
+        BroadcastDatasourceLoadingSpec.createFromContext(
+            null,
+            BroadcastDatasourceLoadingSpec.ALL
+        )
+    );
+  }
+
+  @Test
+  public void testCreateBroadcastLoadingSpecFromContext()
+  {
+    // Only required lookups are returned in the case of context having the 
lookup keys.
+    Assert.assertEquals(
+        BroadcastDatasourceLoadingSpec.loadOnly(ImmutableSet.of("ds1", "ds2")),
+        BroadcastDatasourceLoadingSpec.createFromContext(
+            ImmutableMap.of(
+                
BroadcastDatasourceLoadingSpec.CTX_BROADCAST_DATASOURCES_TO_LOAD, 
Arrays.asList("ds1", "ds2"),
+                
BroadcastDatasourceLoadingSpec.CTX_BROADCAST_DATASOURCE_LOADING_MODE, 
BroadcastDatasourceLoadingSpec.Mode.ONLY_REQUIRED
+            ),
+            BroadcastDatasourceLoadingSpec.ALL
+        )
+    );
+
+    // No lookups are returned in the case of context having mode=NONE, 
irrespective of the default spec.
+    Assert.assertEquals(
+        BroadcastDatasourceLoadingSpec.NONE,
+        BroadcastDatasourceLoadingSpec.createFromContext(
+            ImmutableMap.of(
+                
BroadcastDatasourceLoadingSpec.CTX_BROADCAST_DATASOURCE_LOADING_MODE, 
BroadcastDatasourceLoadingSpec.Mode.NONE),
+            BroadcastDatasourceLoadingSpec.ALL
+        )
+    );
+
+    // All lookups are returned in the case of context having mode=ALL, 
irrespective of the default spec.
+    Assert.assertEquals(
+        BroadcastDatasourceLoadingSpec.ALL,
+        BroadcastDatasourceLoadingSpec.createFromContext(
+            
ImmutableMap.of(BroadcastDatasourceLoadingSpec.CTX_BROADCAST_DATASOURCE_LOADING_MODE,
 BroadcastDatasourceLoadingSpec.Mode.ALL),
+            BroadcastDatasourceLoadingSpec.NONE
+        )
+    );
+  }
+
+  @Test
+  @Parameters(
+      {
+          "NONE1",
+          "A",
+          "Random mode",
+          "all",
+          "only required",
+          "none"
+      }
+  )
+  public void testSpecFromInvalidModeInContext(final String mode)
+  {
+    final DruidException exception = Assert.assertThrows(DruidException.class, 
() -> BroadcastDatasourceLoadingSpec.createFromContext(
+        
ImmutableMap.of(BroadcastDatasourceLoadingSpec.CTX_BROADCAST_DATASOURCE_LOADING_MODE,
 mode), BroadcastDatasourceLoadingSpec.ALL));
+    Assert.assertEquals(StringUtils.format("Invalid value of %s[%s]. Allowed 
values are [ALL, NONE, ONLY_REQUIRED]",
+                                           
BroadcastDatasourceLoadingSpec.CTX_BROADCAST_DATASOURCE_LOADING_MODE, mode), 
exception.getMessage());
+  }
+
+
+  @Test
+  @Parameters(
+      {
+          "foo bar",
+          "foo]"
+      }
+  )
+  public void testSpecFromInvalidBroadcastDatasourcesInContext(final Object 
lookupsToLoad)
+  {
+    final DruidException exception = Assert.assertThrows(DruidException.class, 
() ->
+        BroadcastDatasourceLoadingSpec.createFromContext(
+            ImmutableMap.of(
+                
BroadcastDatasourceLoadingSpec.CTX_BROADCAST_DATASOURCES_TO_LOAD, lookupsToLoad,
+                
BroadcastDatasourceLoadingSpec.CTX_BROADCAST_DATASOURCE_LOADING_MODE, 
BroadcastDatasourceLoadingSpec.Mode.ONLY_REQUIRED),
+            BroadcastDatasourceLoadingSpec.ALL)
+    );
+    Assert.assertEquals(StringUtils.format("Invalid value of %s[%s]. Please 
provide a comma-separated list of "
+                                           + "broadcast datasource names. For 
example: [\"datasourceName1\", \"datasourceName2\"]",
+                                           
BroadcastDatasourceLoadingSpec.CTX_BROADCAST_DATASOURCES_TO_LOAD, 
lookupsToLoad), exception.getMessage());
+  }
+}
diff --git 
a/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperCacheTest.java
 
b/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperCacheTest.java
index 7629a6b875c..187725317a2 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperCacheTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperCacheTest.java
@@ -36,6 +36,7 @@ import org.apache.druid.segment.loading.StorageLocation;
 import org.apache.druid.segment.loading.StorageLocationConfig;
 import org.apache.druid.server.SegmentManager;
 import org.apache.druid.server.TestSegmentUtils;
+import org.apache.druid.server.metrics.DataSourceTaskIdHolder;
 import org.apache.druid.timeline.DataSegment;
 import org.junit.Assert;
 import org.junit.Before;
@@ -137,7 +138,8 @@ public class SegmentBootstrapperCacheTest
         segmentManager,
         new ServerTypeConfig(ServerType.HISTORICAL),
         coordinatorClient,
-        emitter
+        emitter,
+        new DataSourceTaskIdHolder()
     );
 
     bootstrapper.start();
@@ -164,7 +166,8 @@ public class SegmentBootstrapperCacheTest
         segmentManager,
         new ServerTypeConfig(ServerType.HISTORICAL),
         coordinatorClient,
-        emitter
+        emitter,
+        new DataSourceTaskIdHolder()
     );
 
     bootstrapper.start();
@@ -204,7 +207,8 @@ public class SegmentBootstrapperCacheTest
         segmentManager,
         new ServerTypeConfig(ServerType.HISTORICAL),
         coordinatorClient,
-        emitter
+        emitter,
+        new DataSourceTaskIdHolder()
     );
 
     bootstrapper.start();
diff --git 
a/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperTest.java
 
b/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperTest.java
index c41763f1824..fe1424e2700 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperTest.java
@@ -20,13 +20,23 @@
 package org.apache.druid.server.coordination;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Key;
+import com.google.inject.Scopes;
+import com.google.inject.name.Names;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.guice.LifecycleModule;
 import org.apache.druid.guice.ServerTypeConfig;
+import org.apache.druid.jackson.JacksonModule;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.metrics.StubServiceEmitter;
 import org.apache.druid.segment.loading.SegmentLoaderConfig;
 import org.apache.druid.segment.loading.StorageLocationConfig;
 import org.apache.druid.server.SegmentManager;
+import org.apache.druid.server.metrics.DataSourceTaskIdHolder;
 import org.apache.druid.timeline.DataSegment;
 import org.junit.Assert;
 import org.junit.Before;
@@ -125,7 +135,8 @@ public class SegmentBootstrapperTest
         segmentManager,
         new ServerTypeConfig(ServerType.HISTORICAL),
         coordinatorClient,
-        serviceEmitter
+        serviceEmitter,
+        new DataSourceTaskIdHolder()
     );
 
     Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty());
@@ -184,7 +195,8 @@ public class SegmentBootstrapperTest
         segmentManager,
         new ServerTypeConfig(ServerType.HISTORICAL),
         coordinatorClient,
-        serviceEmitter
+        serviceEmitter,
+        new DataSourceTaskIdHolder()
     );
 
     Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty());
@@ -240,7 +252,8 @@ public class SegmentBootstrapperTest
         segmentManager,
         new ServerTypeConfig(ServerType.HISTORICAL),
         coordinatorClient,
-        serviceEmitter
+        serviceEmitter,
+        new DataSourceTaskIdHolder()
     );
 
     Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty());
@@ -267,6 +280,129 @@ public class SegmentBootstrapperTest
     bootstrapper.stop();
   }
 
+  @Test
+  public void testLoadNoBootstrapSegments() throws Exception
+  {
+    final Set<DataSegment> segments = new HashSet<>();
+    for (int i = 0; i < COUNT; ++i) {
+      segments.add(makeSegment("test" + i, "1", 
Intervals.of("P1d/2011-04-01")));
+      segments.add(makeSegment("test" + i, "1", 
Intervals.of("P1d/2011-04-02")));
+      segments.add(makeSegment("test_two" + i, "1", 
Intervals.of("P1d/2011-04-01")));
+      segments.add(makeSegment("test_two" + i, "1", 
Intervals.of("P1d/2011-04-02")));
+    }
+
+    Injector injector = Guice.createInjector(
+        new JacksonModule(),
+        new LifecycleModule(),
+        binder -> {
+          binder.bindScope(LazySingleton.class, Scopes.SINGLETON);
+          final BroadcastDatasourceLoadingSpec broadcastMode = 
BroadcastDatasourceLoadingSpec.NONE;
+          binder.bind(Key.get(BroadcastDatasourceLoadingSpec.class, 
Names.named(DataSourceTaskIdHolder.BROADCAST_DATASOURCES_TO_LOAD_FOR_TASK)))
+                .toInstance(broadcastMode);
+        }
+    );
+
+    final TestCoordinatorClient coordinatorClient = new 
TestCoordinatorClient(segments);
+    final TestSegmentCacheManager cacheManager = new TestSegmentCacheManager();
+    final SegmentManager segmentManager = new SegmentManager(cacheManager);
+    final SegmentLoadDropHandler handler = new SegmentLoadDropHandler(
+        segmentLoaderConfig,
+        segmentAnnouncer,
+        segmentManager
+    );
+    final SegmentBootstrapper bootstrapper = new SegmentBootstrapper(
+        handler,
+        segmentLoaderConfig,
+        segmentAnnouncer,
+        serverAnnouncer,
+        segmentManager,
+        new ServerTypeConfig(ServerType.HISTORICAL),
+        coordinatorClient,
+        serviceEmitter,
+        injector.getInstance(DataSourceTaskIdHolder.class)
+    );
+
+    Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty());
+
+    bootstrapper.start();
+
+    Assert.assertEquals(1, serverAnnouncer.getObservedCount());
+    Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty());
+
+    final ImmutableList<DataSegment> expectedBootstrapSegments = 
ImmutableList.of();
+
+    Assert.assertEquals(expectedBootstrapSegments, 
segmentAnnouncer.getObservedSegments());
+
+    Assert.assertEquals(expectedBootstrapSegments, 
cacheManager.getObservedBootstrapSegments());
+    Assert.assertEquals(expectedBootstrapSegments, 
cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache());
+
+    bootstrapper.stop();
+  }
+
+  @Test
+  public void testLoadOnlyRequiredBootstrapSegments() throws Exception
+  {
+    final Set<DataSegment> segments = new HashSet<>();
+    final DataSegment ds1Segment1 = makeSegment("test1", "1", 
Intervals.of("P1D/2011-04-01"));
+    final DataSegment ds1Segment2 = makeSegment("test1", "1", 
Intervals.of("P1D/2012-04-01"));
+    final DataSegment ds2Segment1 = makeSegment("test2", "1", 
Intervals.of("P1d/2011-04-01"));
+    final DataSegment ds2Segment2 = makeSegment("test2", "1", 
Intervals.of("P1d/2012-04-01"));
+    segments.add(ds1Segment1);
+    segments.add(ds1Segment2);
+    segments.add(ds2Segment1);
+    segments.add(ds2Segment2);
+
+    Injector injector = Guice.createInjector(
+        new JacksonModule(),
+        new LifecycleModule(),
+        binder -> {
+          binder.bindScope(LazySingleton.class, Scopes.SINGLETON);
+          final BroadcastDatasourceLoadingSpec broadcastMode = 
BroadcastDatasourceLoadingSpec.loadOnly(ImmutableSet.of("test1"));
+          binder.bind(Key.get(BroadcastDatasourceLoadingSpec.class, 
Names.named(DataSourceTaskIdHolder.BROADCAST_DATASOURCES_TO_LOAD_FOR_TASK)))
+                .toInstance(broadcastMode);
+        }
+    );
+
+    final TestCoordinatorClient coordinatorClient = new 
TestCoordinatorClient(segments);
+    final TestSegmentCacheManager cacheManager = new TestSegmentCacheManager();
+    final SegmentManager segmentManager = new SegmentManager(cacheManager);
+    final SegmentLoadDropHandler handler = new SegmentLoadDropHandler(
+        segmentLoaderConfig,
+        segmentAnnouncer,
+        segmentManager
+    );
+    final SegmentBootstrapper bootstrapper = new SegmentBootstrapper(
+        handler,
+        segmentLoaderConfig,
+        segmentAnnouncer,
+        serverAnnouncer,
+        segmentManager,
+        new ServerTypeConfig(ServerType.HISTORICAL),
+        coordinatorClient,
+        serviceEmitter,
+        injector.getInstance(DataSourceTaskIdHolder.class)
+    );
+
+    Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty());
+
+    bootstrapper.start();
+
+    Assert.assertEquals(1, serverAnnouncer.getObservedCount());
+    Assert.assertFalse(segmentManager.getDataSourceCounts().isEmpty());
+    Assert.assertEquals(ImmutableSet.of("test1"), 
segmentManager.getDataSourceNames());
+
+    final ImmutableList<DataSegment> expectedBootstrapSegments = 
ImmutableList.of(ds1Segment2, ds1Segment1);
+
+    Assert.assertEquals(expectedBootstrapSegments, 
segmentAnnouncer.getObservedSegments());
+
+    Assert.assertEquals(expectedBootstrapSegments, 
cacheManager.getObservedBootstrapSegments());
+    Assert.assertEquals(expectedBootstrapSegments, 
cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache());
+    serviceEmitter.verifyValue("segment/bootstrap/count", 
expectedBootstrapSegments.size());
+    serviceEmitter.verifyEmitted("segment/bootstrap/time", 1);
+
+    bootstrapper.stop();
+  }
+
   @Test
   public void testLoadBootstrapSegmentsWhenExceptionThrown() throws Exception
   {
@@ -285,7 +421,8 @@ public class SegmentBootstrapperTest
         segmentManager,
         new ServerTypeConfig(ServerType.HISTORICAL),
         coordinatorClient,
-        serviceEmitter
+        serviceEmitter,
+        new DataSourceTaskIdHolder()
     );
 
     Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty());
diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java 
b/services/src/main/java/org/apache/druid/cli/CliPeon.java
index 61a8ab7374e..15374625d30 100644
--- a/services/src/main/java/org/apache/druid/cli/CliPeon.java
+++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java
@@ -123,6 +123,7 @@ import 
org.apache.druid.segment.realtime.appenderator.PeonAppenderatorsManager;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.ResponseContextConfig;
 import org.apache.druid.server.SegmentManager;
+import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec;
 import org.apache.druid.server.coordination.SegmentBootstrapper;
 import org.apache.druid.server.coordination.ServerType;
 import org.apache.druid.server.coordination.ZkCoordinator;
@@ -176,12 +177,26 @@ public class CliPeon extends GuiceRunnable
   private boolean isZkEnabled = true;
 
   /**
+   * <p> This option is deprecated, see {@link #loadBroadcastDatasourcesMode} 
option. </p>
+   *
    * If set to "true", the peon will bind classes necessary for loading 
broadcast segments. This is used for
    * queryable tasks, such as streaming ingestion tasks.
+   *
    */
-  @Option(name = "--loadBroadcastSegments", title = "loadBroadcastSegments", 
description = "Enable loading of broadcast segments")
+  @Deprecated
+  @Option(name = "--loadBroadcastSegments", title = "loadBroadcastSegments",
+      description = "Enable loading of broadcast segments. This option is 
deprecated and will be removed in a"
+                    + " future release. Use --loadBroadcastDatasourceMode 
instead.")
   public String loadBroadcastSegments = "false";
 
+  /**
+   * Broadcast datasource loading mode. The peon will bind classes necessary 
required for loading broadcast segments if
+   * the mode is {@link BroadcastDatasourceLoadingSpec.Mode#ALL} or {@link 
BroadcastDatasourceLoadingSpec.Mode#ONLY_REQUIRED}.
+   */
+  @Option(name = "--loadBroadcastDatasourceMode", title = 
"loadBroadcastDatasourceMode",
+      description = "Specify the broadcast datasource loading mode for the 
peon. Supported values are ALL, NONE, ONLY_REQUIRED.")
+  public String loadBroadcastDatasourcesMode = 
BroadcastDatasourceLoadingSpec.Mode.ALL.toString();
+
   @Option(name = "--taskId", title = "taskId", description = "TaskId for 
fetching task.json remotely")
   public String taskId = "";
 
@@ -274,7 +289,11 @@ public class CliPeon extends GuiceRunnable
             binder.bind(ServerTypeConfig.class).toInstance(new 
ServerTypeConfig(ServerType.fromString(serverType)));
             LifecycleModule.register(binder, Server.class);
 
-            if ("true".equals(loadBroadcastSegments)) {
+            final BroadcastDatasourceLoadingSpec.Mode mode =
+                
BroadcastDatasourceLoadingSpec.Mode.valueOf(loadBroadcastDatasourcesMode);
+            if ("true".equals(loadBroadcastSegments)
+                || mode == BroadcastDatasourceLoadingSpec.Mode.ALL
+                || mode == BroadcastDatasourceLoadingSpec.Mode.ONLY_REQUIRED) {
               binder.install(new BroadcastSegmentLoadingModule());
             }
           }
@@ -340,6 +359,14 @@ public class CliPeon extends GuiceRunnable
           {
             return task.getLookupLoadingSpec();
           }
+
+          @Provides
+          @LazySingleton
+          @Named(DataSourceTaskIdHolder.BROADCAST_DATASOURCES_TO_LOAD_FOR_TASK)
+          public BroadcastDatasourceLoadingSpec 
getBroadcastDatasourcesToLoad(final Task task)
+          {
+            return task.getBroadcastDatasourceLoadingSpec();
+          }
         },
         new QueryablePeonModule(),
         new IndexingServiceInputSourceModule(),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to