This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e4d596d825 MSQ: Include worker context maps in WorkOrders. (#17076)
2e4d596d825 is described below

commit 2e4d596d825f46e7471078d69f127d028e8d8d6c
Author: Gian Merlino <[email protected]>
AuthorDate: Tue Sep 17 01:37:21 2024 -0700

    MSQ: Include worker context maps in WorkOrders. (#17076)
    
    * MSQ: Include worker context maps in WorkOrders.
    
    This provides a mechanism to send contexts to workers in long-lived,
    shared JVMs that are not part of the task system.
    
    * Style, coverage.
---
 .../apache/druid/msq/exec/ControllerContext.java   |  5 +-
 .../org/apache/druid/msq/exec/ControllerImpl.java  |  5 +-
 .../java/org/apache/druid/msq/exec/WorkerImpl.java | 80 ++++++++++++++------
 .../msq/indexing/IndexerControllerContext.java     | 78 +++++++++++++------
 .../druid/msq/indexing/IndexerWorkerContext.java   |  5 +-
 .../org/apache/druid/msq/kernel/WorkOrder.java     | 70 ++++++++++++++++-
 .../kernel/controller/ControllerQueryKernel.java   |  3 +-
 .../controller/ControllerQueryKernelConfig.java    | 62 ++++++++++-----
 .../druid/msq/util/MultiStageQueryContext.java     | 18 ++---
 .../apache/druid/msq/exec/QueryValidatorTest.java  |  3 +-
 .../org/apache/druid/msq/exec/WorkerImplTest.java  | 88 ++++++++++++++++++++++
 .../ControllerQueryKernelConfigTest.java           | 83 ++++++++++++++++++++
 .../druid/msq/test/MSQTestControllerContext.java   |  3 +-
 13 files changed, 408 insertions(+), 95 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java
index bc449d14120..44b22af3666 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java
@@ -28,7 +28,6 @@ import org.apache.druid.msq.indexing.MSQSpec;
 import org.apache.druid.msq.input.InputSpecSlicer;
 import org.apache.druid.msq.input.table.SegmentsInputSlice;
 import org.apache.druid.msq.input.table.TableInputSpec;
-import org.apache.druid.msq.kernel.QueryDefinition;
 import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig;
 import org.apache.druid.msq.querykit.QueryKit;
 import org.apache.druid.msq.util.MultiStageQueryContext;
@@ -43,7 +42,7 @@ public interface ControllerContext
   /**
    * Configuration for {@link 
org.apache.druid.msq.kernel.controller.ControllerQueryKernel}.
    */
-  ControllerQueryKernelConfig queryKernelConfig(MSQSpec querySpec, 
QueryDefinition queryDef);
+  ControllerQueryKernelConfig queryKernelConfig(String queryId, MSQSpec 
querySpec);
 
   /**
    * Callback from the controller implementation to "register" the controller. 
Used in the indexing task implementation
@@ -88,7 +87,7 @@ public interface ControllerContext
    *
    * @param queryId               query ID
    * @param querySpec             query spec
-   * @param queryKernelConfig     config from {@link 
#queryKernelConfig(MSQSpec, QueryDefinition)}
+   * @param queryKernelConfig     config from {@link 
#queryKernelConfig(String, MSQSpec)}
    * @param workerFailureListener listener that receives callbacks when 
workers fail
    */
   WorkerManager newWorkerManager(
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index 2a29d40b9fe..8eda56ad857 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -562,8 +562,8 @@ public class ControllerImpl implements Controller
   private QueryDefinition initializeQueryDefAndState(final Closer closer)
   {
     this.selfDruidNode = context.selfNode();
-    this.netClient = new 
ExceptionWrappingWorkerClient(context.newWorkerClient());
-    closer.register(netClient);
+    this.netClient = closer.register(new 
ExceptionWrappingWorkerClient(context.newWorkerClient()));
+    this.queryKernelConfig = context.queryKernelConfig(queryId, querySpec);
 
     final QueryContext queryContext = querySpec.getQuery().context();
     final QueryDefinition queryDef = makeQueryDefinition(
@@ -594,7 +594,6 @@ public class ControllerImpl implements Controller
     QueryValidator.validateQueryDef(queryDef);
     queryDefRef.set(queryDef);
 
-    queryKernelConfig = context.queryKernelConfig(querySpec, queryDef);
     workerManager = context.newWorkerManager(
         queryId,
         querySpec,
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
index 74e3850c6e9..702302f7ea1 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
@@ -48,7 +48,6 @@ import org.apache.druid.msq.counters.CounterSnapshotsTree;
 import org.apache.druid.msq.counters.CounterTracker;
 import org.apache.druid.msq.indexing.InputChannelFactory;
 import org.apache.druid.msq.indexing.MSQWorkerTask;
-import org.apache.druid.msq.indexing.destination.MSQSelectDestination;
 import org.apache.druid.msq.indexing.error.CanceledFault;
 import org.apache.druid.msq.indexing.error.CannotParseExternalDataFault;
 import org.apache.druid.msq.indexing.error.MSQErrorReport;
@@ -388,7 +387,6 @@ public class WorkerImpl implements Worker
     final InputChannelFactory inputChannelFactory =
         makeBaseInputChannelFactory(workOrder, controllerClient, 
kernelHolder.processorCloser);
 
-    final QueryContext queryContext = task != null ? 
QueryContext.of(task.getContext()) : QueryContext.empty();
     final boolean includeAllCounters = context.includeAllCounters();
     final RunWorkOrder runWorkOrder = new RunWorkOrder(
         workOrder,
@@ -402,8 +400,8 @@ public class WorkerImpl implements Worker
         context,
         frameContext,
         makeRunWorkOrderListener(workOrder, controllerClient, 
criticalWarningCodes, maxVerboseParseExceptions),
-        MultiStageQueryContext.isReindex(queryContext),
-        MultiStageQueryContext.removeNullBytes(queryContext)
+        MultiStageQueryContext.isReindex(workOrder.getWorkerContext()),
+        MultiStageQueryContext.removeNullBytes(workOrder.getWorkerContext())
     );
 
     // Set up processorCloser (called when processing is done).
@@ -560,6 +558,13 @@ public class WorkerImpl implements Worker
     return getOrCreateStageOutputHolder(stageId, 
partitionNumber).readRemotelyFrom(offset);
   }
 
+  /**
+   * Accept a new {@link WorkOrder} for execution.
+   *
+   * For backwards-compatibility purposes, this method populates {@link 
WorkOrder#getOutputChannelMode()}
+   * and {@link WorkOrder#getWorkerContext()} if the controller did not set 
them. (They are there for newer controllers,
+   * but not older ones.)
+   */
   @Override
   public void postWorkOrder(final WorkOrder workOrder)
   {
@@ -577,28 +582,11 @@ public class WorkerImpl implements Worker
       );
     }
 
-    final OutputChannelMode outputChannelMode;
-
-    // This stack of conditions can be removed once we can rely on 
OutputChannelMode always being in the WorkOrder.
-    // (It will be there for newer controllers; this is a 
backwards-compatibility thing.)
-    if (workOrder.hasOutputChannelMode()) {
-      outputChannelMode = workOrder.getOutputChannelMode();
-    } else {
-      final MSQSelectDestination selectDestination =
-          task != null
-          ? 
MultiStageQueryContext.getSelectDestination(QueryContext.of(task.getContext()))
-          : MSQSelectDestination.TASKREPORT;
-
-      outputChannelMode = ControllerQueryKernelUtils.getOutputChannelMode(
-          workOrder.getQueryDefinition(),
-          workOrder.getStageNumber(),
-          selectDestination,
-          task != null && 
MultiStageQueryContext.isDurableStorageEnabled(QueryContext.of(task.getContext())),
-          false
-      );
-    }
+    final WorkOrder workOrderToUse = makeWorkOrderToUse(
+        workOrder,
+        task != null && task.getContext() != null ? 
QueryContext.of(task.getContext()) : QueryContext.empty()
+    );
 
-    final WorkOrder workOrderToUse = 
workOrder.withOutputChannelMode(outputChannelMode);
     kernelManipulationQueue.add(
         kernelHolders ->
             kernelHolders.addKernel(WorkerStageKernel.create(workOrderToUse))
@@ -1009,6 +997,48 @@ public class WorkerImpl implements Worker
     );
   }
 
+  /**
+   * Returns a work order based on the provided "originalWorkOrder", but where 
{@link WorkOrder#hasOutputChannelMode()}
+   * and {@link WorkOrder#hasWorkerContext()} are both true. If the original 
work order didn't have those fields, they
+   * are populated from the "taskContext". Otherwise the "taskContext" is 
ignored.
+   *
+   * This method can be removed once we can rely on these fields always being 
set in the WorkOrder.
+   * (They will be there for newer controllers; this is a 
backwards-compatibility method.)
+   *
+   * @param originalWorkOrder work order from controller
+   * @param taskContext       task context
+   */
+  static WorkOrder makeWorkOrderToUse(final WorkOrder originalWorkOrder, 
@Nullable final QueryContext taskContext)
+  {
+    // This condition can be removed once we can rely on QueryContext always 
being in the WorkOrder.
+    // (It will be there for newer controllers; this is a 
backwards-compatibility thing.)
+    final QueryContext queryContext;
+    if (originalWorkOrder.hasWorkerContext()) {
+      queryContext = originalWorkOrder.getWorkerContext();
+    } else if (taskContext != null) {
+      queryContext = taskContext;
+    } else {
+      queryContext = QueryContext.empty();
+    }
+
+    // This stack of conditions can be removed once we can rely on 
OutputChannelMode always being in the WorkOrder.
+    // (It will be there for newer controllers; this is a 
backwards-compatibility thing.)
+    final OutputChannelMode outputChannelMode;
+    if (originalWorkOrder.hasOutputChannelMode()) {
+      outputChannelMode = originalWorkOrder.getOutputChannelMode();
+    } else {
+      outputChannelMode = ControllerQueryKernelUtils.getOutputChannelMode(
+          originalWorkOrder.getQueryDefinition(),
+          originalWorkOrder.getStageNumber(),
+          MultiStageQueryContext.getSelectDestination(queryContext),
+          MultiStageQueryContext.isDurableStorageEnabled(queryContext),
+          false
+      );
+    }
+
+    return 
originalWorkOrder.withWorkerContext(queryContext).withOutputChannelMode(outputChannelMode);
+  }
+
   /**
    * Log (at DEBUG level) a string explaining the status of all work assigned 
to this worker.
    */
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
index 42808f64742..589b17d632b 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
@@ -46,7 +46,7 @@ import org.apache.druid.msq.indexing.error.MSQWarnings;
 import org.apache.druid.msq.indexing.error.UnknownFault;
 import org.apache.druid.msq.input.InputSpecSlicer;
 import org.apache.druid.msq.input.table.TableInputSpecSlicer;
-import org.apache.druid.msq.kernel.QueryDefinition;
+import org.apache.druid.msq.kernel.WorkOrder;
 import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig;
 import org.apache.druid.msq.util.MultiStageQueryContext;
 import org.apache.druid.query.DruidMetrics;
@@ -66,6 +66,8 @@ import java.util.concurrent.TimeUnit;
  */
 public class IndexerControllerContext implements ControllerContext
 {
+  public static final int DEFAULT_MAX_CONCURRENT_STAGES = 1;
+
   private static final Logger log = new Logger(IndexerControllerContext.class);
 
   private final MSQControllerTask task;
@@ -96,21 +98,21 @@ public class IndexerControllerContext implements 
ControllerContext
 
   @Override
   public ControllerQueryKernelConfig queryKernelConfig(
-      final MSQSpec querySpec,
-      final QueryDefinition queryDef
+      final String queryId,
+      final MSQSpec querySpec
   )
   {
     final ControllerMemoryParameters memoryParameters =
         ControllerMemoryParameters.createProductionInstance(
             memoryIntrospector,
-            queryDef.getFinalStageDefinition().getMaxWorkerCount()
+            querySpec.getTuningConfig().getMaxNumWorkers()
         );
 
     final ControllerQueryKernelConfig config = 
makeQueryKernelConfig(querySpec, memoryParameters);
 
     log.debug(
         "Query[%s] using %s[%s], %s[%s], %s[%s].",
-        queryDef.getQueryId(),
+        queryId,
         MultiStageQueryContext.CTX_DURABLE_SHUFFLE_STORAGE,
         config.isDurableStorage(),
         MultiStageQueryContext.CTX_FAULT_TOLERANCE,
@@ -210,7 +212,7 @@ public class IndexerControllerContext implements 
ControllerContext
   }
 
   /**
-   * Helper method for {@link #queryKernelConfig(MSQSpec, QueryDefinition)}. 
Also used in tests.
+   * Helper method for {@link #queryKernelConfig(String, MSQSpec)}. Also used 
in tests.
    */
   public static ControllerQueryKernelConfig makeQueryKernelConfig(
       final MSQSpec querySpec,
@@ -218,7 +220,8 @@ public class IndexerControllerContext implements 
ControllerContext
   )
   {
     final QueryContext queryContext = querySpec.getQuery().context();
-    final int maxConcurrentStages = 
MultiStageQueryContext.getMaxConcurrentStages(queryContext);
+    final int maxConcurrentStages =
+        MultiStageQueryContext.getMaxConcurrentStagesWithDefault(queryContext, 
DEFAULT_MAX_CONCURRENT_STAGES);
     final boolean isFaultToleranceEnabled = 
MultiStageQueryContext.isFaultToleranceEnabled(queryContext);
     final boolean isDurableStorageEnabled;
 
@@ -256,9 +259,44 @@ public class IndexerControllerContext implements 
ControllerContext
         .destination(querySpec.getDestination())
         .maxConcurrentStages(maxConcurrentStages)
         
.maxRetainedPartitionSketchBytes(memoryParameters.getPartitionStatisticsMaxRetainedBytes())
+        .workerContextMap(makeWorkerContextMap(querySpec, 
isDurableStorageEnabled, maxConcurrentStages))
         .build();
   }
 
+  /**
+   * Helper method for {@link #makeQueryKernelConfig} and {@link 
#makeTaskContext}. Makes the worker context map,
+   * i.e., the map that will become {@link WorkOrder#getWorkerContext()}.
+   */
+  public static Map<String, Object> makeWorkerContextMap(
+      final MSQSpec querySpec,
+      final boolean durableStorageEnabled,
+      final int maxConcurrentStages
+  )
+  {
+    final QueryContext queryContext = querySpec.getQuery().context();
+    final long maxParseExceptions = 
MultiStageQueryContext.getMaxParseExceptions(queryContext);
+    final boolean removeNullBytes = 
MultiStageQueryContext.removeNullBytes(queryContext);
+    final boolean includeAllCounters = 
MultiStageQueryContext.getIncludeAllCounters(queryContext);
+    final ImmutableMap.Builder<String, Object> builder = 
ImmutableMap.builder();
+
+    builder
+        .put(MultiStageQueryContext.CTX_DURABLE_SHUFFLE_STORAGE, 
durableStorageEnabled)
+        .put(MSQWarnings.CTX_MAX_PARSE_EXCEPTIONS_ALLOWED, maxParseExceptions)
+        .put(MultiStageQueryContext.CTX_IS_REINDEX, 
MSQControllerTask.isReplaceInputDataSourceTask(querySpec))
+        .put(MultiStageQueryContext.CTX_MAX_CONCURRENT_STAGES, 
maxConcurrentStages)
+        .put(MultiStageQueryContext.CTX_REMOVE_NULL_BYTES, removeNullBytes)
+        .put(MultiStageQueryContext.CTX_INCLUDE_ALL_COUNTERS, 
includeAllCounters);
+
+    if (querySpec.getDestination().toSelectDestination() != null) {
+      builder.put(
+          MultiStageQueryContext.CTX_SELECT_DESTINATION,
+          querySpec.getDestination().toSelectDestination().getName()
+      );
+    }
+
+    return builder.build();
+  }
+
   /**
    * Helper method for {@link #newWorkerManager}, split out to be used in 
tests.
    *
@@ -271,17 +309,16 @@ public class IndexerControllerContext implements 
ControllerContext
   )
   {
     final ImmutableMap.Builder<String, Object> taskContextOverridesBuilder = 
ImmutableMap.builder();
-    final long maxParseExceptions = 
MultiStageQueryContext.getMaxParseExceptions(querySpec.getQuery().context());
-    final boolean removeNullBytes = 
MultiStageQueryContext.removeNullBytes(querySpec.getQuery().context());
-    final boolean includeAllCounters = 
MultiStageQueryContext.getIncludeAllCounters(querySpec.getQuery().context());
 
-    taskContextOverridesBuilder
-        .put(MultiStageQueryContext.CTX_DURABLE_SHUFFLE_STORAGE, 
queryKernelConfig.isDurableStorage())
-        .put(MSQWarnings.CTX_MAX_PARSE_EXCEPTIONS_ALLOWED, maxParseExceptions)
-        .put(MultiStageQueryContext.CTX_IS_REINDEX, 
MSQControllerTask.isReplaceInputDataSourceTask(querySpec))
-        .put(MultiStageQueryContext.CTX_MAX_CONCURRENT_STAGES, 
queryKernelConfig.getMaxConcurrentStages())
-        .put(MultiStageQueryContext.CTX_REMOVE_NULL_BYTES, removeNullBytes)
-        .put(MultiStageQueryContext.CTX_INCLUDE_ALL_COUNTERS, 
includeAllCounters);
+    // Put worker context into the task context. That way, workers can get 
these context keys either from
+    // WorkOrder#getContext or Task#getContext.
+    taskContextOverridesBuilder.putAll(
+        makeWorkerContextMap(
+            querySpec,
+            queryKernelConfig.isDurableStorage(),
+            queryKernelConfig.getMaxConcurrentStages()
+        )
+    );
 
     // Put the lookup loading info in the task context to facilitate selective 
loading of lookups.
     if (controllerTaskContext.get(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE) 
!= null) {
@@ -297,13 +334,6 @@ public class IndexerControllerContext implements 
ControllerContext
       );
     }
 
-    if (querySpec.getDestination().toSelectDestination() != null) {
-      taskContextOverridesBuilder.put(
-          MultiStageQueryContext.CTX_SELECT_DESTINATION,
-          querySpec.getDestination().toSelectDestination().getName()
-      );
-    }
-
     // propagate the controller's tags to the worker task for enhanced metrics 
reporting
     @SuppressWarnings("unchecked")
     Map<String, Object> tags = (Map<String, Object>) 
controllerTaskContext.get(DruidMetrics.TAGS);
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
index 2a7d91c40af..fbb0bff9556 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
@@ -116,7 +116,10 @@ public class IndexerWorkerContext implements WorkerContext
     this.dataServerQueryHandlerFactory = dataServerQueryHandlerFactory;
 
     final QueryContext queryContext = QueryContext.of(task.getContext());
-    this.maxConcurrentStages = 
MultiStageQueryContext.getMaxConcurrentStages(queryContext);
+    this.maxConcurrentStages = 
MultiStageQueryContext.getMaxConcurrentStagesWithDefault(
+        queryContext,
+        IndexerControllerContext.DEFAULT_MAX_CONCURRENT_STAGES
+    );
     this.includeAllCounters = 
MultiStageQueryContext.getIncludeAllCounters(queryContext);
   }
 
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/WorkOrder.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/WorkOrder.java
index 0c857870210..2a45605826b 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/WorkOrder.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/WorkOrder.java
@@ -26,9 +26,11 @@ import com.google.common.base.Preconditions;
 import org.apache.druid.msq.exec.ControllerClient;
 import org.apache.druid.msq.exec.OutputChannelMode;
 import org.apache.druid.msq.input.InputSlice;
+import org.apache.druid.query.QueryContext;
 
 import javax.annotation.Nullable;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 
 /**
@@ -51,9 +53,18 @@ public class WorkOrder
   @Nullable
   private final List<String> workerIds;
 
+  /**
+   * Always non-null for newer controllers. This is marked nullable for 
backwards-compatibility reasons.
+   */
   @Nullable
   private final OutputChannelMode outputChannelMode;
 
+  /**
+   * Always non-null for newer controllers. This is marked nullable for 
backwards-compatibility reasons.
+   */
+  @Nullable
+  private final QueryContext workerContext;
+
   @JsonCreator
   @SuppressWarnings("rawtypes")
   public WorkOrder(
@@ -63,7 +74,8 @@ public class WorkOrder
       @JsonProperty("input") final List<InputSlice> workerInputs,
       @JsonProperty("extra") @Nullable final ExtraInfoHolder extraInfoHolder,
       @JsonProperty("workers") @Nullable final List<String> workerIds,
-      @JsonProperty("output") @Nullable final OutputChannelMode 
outputChannelMode
+      @JsonProperty("output") @Nullable final OutputChannelMode 
outputChannelMode,
+      @JsonProperty("context") @Nullable final Map<String, Object> 
workerContext
   )
   {
     this.queryDefinition = Preconditions.checkNotNull(queryDefinition, 
"queryDefinition");
@@ -73,6 +85,7 @@ public class WorkOrder
     this.extraInfoHolder = extraInfoHolder;
     this.workerIds = workerIds;
     this.outputChannelMode = outputChannelMode;
+    this.workerContext = workerContext != null ? 
QueryContext.of(workerContext) : null;
   }
 
   @JsonProperty("query")
@@ -124,6 +137,10 @@ public class WorkOrder
     return outputChannelMode != null;
   }
 
+  /**
+   * Retrieves the output channel mode set by the controller. Null means the 
controller didn't set it, which means
+   * we're dealing with an older controller.
+   */
   @Nullable
   @JsonProperty("output")
   @JsonInclude(JsonInclude.Include.NON_NULL)
@@ -132,6 +149,29 @@ public class WorkOrder
     return outputChannelMode;
   }
 
+  public boolean hasWorkerContext()
+  {
+    return workerContext != null;
+  }
+
+  /**
+   * Retrieves the query context set by the controller. Null means the 
controller didn't set it, which means
+   * we're dealing with an older controller.
+   */
+  @Nullable
+  public QueryContext getWorkerContext()
+  {
+    return workerContext;
+  }
+
+  @Nullable
+  @JsonProperty("context")
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public Map<String, Object> getContextForSerialization()
+  {
+    return workerContext != null ? workerContext.asMap() : null;
+  }
+
   @Nullable
   public Object getExtraInfo()
   {
@@ -155,7 +195,26 @@ public class WorkOrder
           workerInputs,
           extraInfoHolder,
           workerIds,
-          newOutputChannelMode
+          newOutputChannelMode,
+          workerContext != null ? workerContext.asMap() : null
+      );
+    }
+  }
+
+  public WorkOrder withWorkerContext(final QueryContext newContext)
+  {
+    if (Objects.equals(newContext, this.workerContext)) {
+      return this;
+    } else {
+      return new WorkOrder(
+          queryDefinition,
+          stageNumber,
+          workerNumber,
+          workerInputs,
+          extraInfoHolder,
+          workerIds,
+          outputChannelMode,
+          newContext.asMap()
       );
     }
   }
@@ -176,7 +235,8 @@ public class WorkOrder
            && Objects.equals(workerInputs, workOrder.workerInputs)
            && Objects.equals(extraInfoHolder, workOrder.extraInfoHolder)
            && Objects.equals(workerIds, workOrder.workerIds)
-           && Objects.equals(outputChannelMode, workOrder.outputChannelMode);
+           && Objects.equals(outputChannelMode, workOrder.outputChannelMode)
+           && Objects.equals(workerContext, workOrder.workerContext);
   }
 
   @Override
@@ -189,7 +249,8 @@ public class WorkOrder
         workerInputs,
         extraInfoHolder,
         workerIds,
-        outputChannelMode
+        outputChannelMode,
+        workerContext
     );
   }
 
@@ -204,6 +265,7 @@ public class WorkOrder
            ", extraInfoHolder=" + extraInfoHolder +
            ", workerIds=" + workerIds +
            ", outputChannelMode=" + outputChannelMode +
+           ", context=" + workerContext +
            '}';
   }
 }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java
index b01091f9ad7..62a13326909 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java
@@ -302,7 +302,8 @@ public class ControllerQueryKernel
           workerInputs.inputsForWorker(workerNumber),
           extraInfoHolder,
           config.getWorkerIds(),
-          outputChannelMode
+          outputChannelMode,
+          config.getWorkerContextMap()
       );
 
       QueryValidator.validateWorkOrder(workOrder);
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernelConfig.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernelConfig.java
index 5c754aedd4f..f7516c63c92 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernelConfig.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernelConfig.java
@@ -21,9 +21,12 @@ package org.apache.druid.msq.kernel.controller;
 
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.msq.indexing.destination.MSQDestination;
+import org.apache.druid.msq.kernel.WorkOrder;
 
 import javax.annotation.Nullable;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 
 /**
@@ -37,22 +40,22 @@ public class ControllerQueryKernelConfig
   private final boolean durableStorage;
   private final boolean faultTolerance;
   private final MSQDestination destination;
-
   @Nullable
-  private final String controllerId;
-
+  private final String controllerHost;
   @Nullable
   private final List<String> workerIds;
+  private final Map<String, Object> workerContextMap;
 
-  private ControllerQueryKernelConfig(
+  ControllerQueryKernelConfig(
       int maxRetainedPartitionSketchBytes,
       int maxConcurrentStages,
       boolean pipeline,
       boolean durableStorage,
       boolean faultTolerance,
       MSQDestination destination,
-      @Nullable String controllerId,
-      @Nullable List<String> workerIds
+      @Nullable String controllerHost,
+      @Nullable List<String> workerIds,
+      Map<String, Object> workerContextMap
   )
   {
     if (maxRetainedPartitionSketchBytes <= 0) {
@@ -85,8 +88,9 @@ public class ControllerQueryKernelConfig
     this.durableStorage = durableStorage;
     this.faultTolerance = faultTolerance;
     this.destination = destination;
-    this.controllerId = controllerId;
+    this.controllerHost = controllerHost;
     this.workerIds = workerIds;
+    this.workerContextMap = workerContextMap;
   }
 
   public static Builder builder()
@@ -130,6 +134,14 @@ public class ControllerQueryKernelConfig
     return workerIds;
   }
 
+  /**
+   * Map to include in {@link WorkOrder}, as {@link 
WorkOrder#getWorkerContext()}.
+   */
+  public Map<String, Object> getWorkerContextMap()
+  {
+    return workerContextMap;
+  }
+
   @Override
   public boolean equals(Object o)
   {
@@ -145,8 +157,10 @@ public class ControllerQueryKernelConfig
            && pipeline == that.pipeline
            && durableStorage == that.durableStorage
            && faultTolerance == that.faultTolerance
-           && Objects.equals(controllerId, that.controllerId)
-           && Objects.equals(workerIds, that.workerIds);
+           && Objects.equals(destination, that.destination)
+           && Objects.equals(controllerHost, that.controllerHost)
+           && Objects.equals(workerIds, that.workerIds)
+           && Objects.equals(workerContextMap, that.workerContextMap);
   }
 
   @Override
@@ -158,8 +172,10 @@ public class ControllerQueryKernelConfig
         pipeline,
         durableStorage,
         faultTolerance,
-        controllerId,
-        workerIds
+        destination,
+        controllerHost,
+        workerIds,
+        workerContextMap
     );
   }
 
@@ -171,9 +187,11 @@ public class ControllerQueryKernelConfig
            ", maxConcurrentStages=" + maxConcurrentStages +
            ", pipeline=" + pipeline +
            ", durableStorage=" + durableStorage +
-           ", faultTolerant=" + faultTolerance +
-           ", controllerId='" + controllerId + '\'' +
+           ", faultTolerance=" + faultTolerance +
+           ", destination=" + destination +
+           ", controllerHost='" + controllerHost + '\'' +
            ", workerIds=" + workerIds +
+           ", workerContextMap=" + workerContextMap +
            '}';
   }
 
@@ -185,8 +203,9 @@ public class ControllerQueryKernelConfig
     private boolean durableStorage;
     private boolean faultTolerant;
     private MSQDestination destination;
-    private String controllerId;
+    private String controllerHost;
     private List<String> workerIds;
+    private Map<String, Object> workerContextMap = Collections.emptyMap();
 
     /**
      * Use {@link #builder()}.
@@ -231,9 +250,9 @@ public class ControllerQueryKernelConfig
       return this;
     }
 
-    public Builder controllerId(final String controllerId)
+    public Builder controllerHost(final String controllerHost)
     {
-      this.controllerId = controllerId;
+      this.controllerHost = controllerHost;
       return this;
     }
 
@@ -243,6 +262,12 @@ public class ControllerQueryKernelConfig
       return this;
     }
 
+    public Builder workerContextMap(final Map<String, Object> workerContextMap)
+    {
+      this.workerContextMap = workerContextMap;
+      return this;
+    }
+
     public ControllerQueryKernelConfig build()
     {
       return new ControllerQueryKernelConfig(
@@ -252,8 +277,9 @@ public class ControllerQueryKernelConfig
           durableStorage,
           faultTolerant,
           destination,
-          controllerId,
-          workerIds
+          controllerHost,
+          workerIds,
+          workerContextMap
       );
     }
   }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
index 63601c907a2..4ed98dca594 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
@@ -119,7 +119,6 @@ public class MultiStageQueryContext
   public static final SegmentSource DEFAULT_INCLUDE_SEGMENT_SOURCE = 
SegmentSource.NONE;
 
   public static final String CTX_MAX_CONCURRENT_STAGES = "maxConcurrentStages";
-  public static final int DEFAULT_MAX_CONCURRENT_STAGES = 1;
   public static final String CTX_DURABLE_SHUFFLE_STORAGE = 
"durableShuffleStorage";
   private static final boolean DEFAULT_DURABLE_SHUFFLE_STORAGE = false;
   public static final String CTX_SELECT_DESTINATION = "selectDestination";
@@ -206,11 +205,14 @@ public class MultiStageQueryContext
     );
   }
 
-  public static int getMaxConcurrentStages(final QueryContext queryContext)
+  public static int getMaxConcurrentStagesWithDefault(
+      final QueryContext queryContext,
+      final int defaultMaxConcurrentStages
+  )
   {
     return queryContext.getInt(
         CTX_MAX_CONCURRENT_STAGES,
-        DEFAULT_MAX_CONCURRENT_STAGES
+        defaultMaxConcurrentStages
     );
   }
 
@@ -336,16 +338,6 @@ public class MultiStageQueryContext
     );
   }
 
-  @Nullable
-  public static MSQSelectDestination getSelectDestinationOrNull(final 
QueryContext queryContext)
-  {
-    return QueryContexts.getAsEnum(
-        CTX_SELECT_DESTINATION,
-        queryContext.getString(CTX_SELECT_DESTINATION),
-        MSQSelectDestination.class
-    );
-  }
-
   public static int getRowsInMemory(final QueryContext queryContext)
   {
     return queryContext.getInt(CTX_ROWS_IN_MEMORY, DEFAULT_ROWS_IN_MEMORY);
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/QueryValidatorTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/QueryValidatorTest.java
index d7364124483..c1d1030fb08 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/QueryValidatorTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/QueryValidatorTest.java
@@ -108,6 +108,7 @@ public class QueryValidatorTest
         Collections.singletonList(() -> inputFiles), // Slice with a large 
number of inputFiles
         null,
         null,
+        null,
         null
     );
 
@@ -125,7 +126,7 @@ public class QueryValidatorTest
     QueryValidator.validateWorkOrder(workOrder);
   }
 
-  private static QueryDefinition createQueryDefinition(int numColumns, int 
numWorkers)
+  public static QueryDefinition createQueryDefinition(int numColumns, int 
numWorkers)
   {
     QueryDefinitionBuilder builder = 
QueryDefinition.builder(UUID.randomUUID().toString());
 
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerImplTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerImplTest.java
new file mode 100644
index 00000000000..32cd36d0998
--- /dev/null
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerImplTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.exec;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.msq.kernel.WorkOrder;
+import org.apache.druid.msq.util.MultiStageQueryContext;
+import org.apache.druid.query.QueryContext;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Map;
+
+public class WorkerImplTest
+{
+  @Test
+  public void test_makeWorkOrderToUse_nothingMissing()
+  {
+    final WorkOrder workOrder = new WorkOrder(
+        QueryValidatorTest.createQueryDefinition(10, 2),
+        0,
+        0,
+        Collections.singletonList(() -> 1),
+        null,
+        null,
+        OutputChannelMode.MEMORY,
+        ImmutableMap.of("foo", "bar")
+    );
+
+    Assert.assertSame(
+        workOrder,
+        WorkerImpl.makeWorkOrderToUse(
+            workOrder,
+            QueryContext.of(ImmutableMap.of("foo", "baz")) /* Conflicts with 
workOrder context; should be ignored */
+        )
+    );
+  }
+
+  @Test
+  public void 
test_makeWorkOrderToUse_missingOutputChannelModeAndWorkerContext()
+  {
+    final Map<String, Object> taskContext =
+        ImmutableMap.of("foo", "bar", 
MultiStageQueryContext.CTX_DURABLE_SHUFFLE_STORAGE, true);
+
+    final WorkOrder workOrder = new WorkOrder(
+        QueryValidatorTest.createQueryDefinition(10, 2),
+        1,
+        2,
+        Collections.singletonList(() -> 1),
+        null,
+        null,
+        null,
+        null
+    );
+
+    Assert.assertEquals(
+        new WorkOrder(
+            workOrder.getQueryDefinition(),
+            workOrder.getStageNumber(),
+            workOrder.getWorkerNumber(),
+            workOrder.getInputs(),
+            null,
+            null,
+            OutputChannelMode.DURABLE_STORAGE_INTERMEDIATE,
+            taskContext
+        ),
+        WorkerImpl.makeWorkOrderToUse(workOrder, QueryContext.of(taskContext))
+    );
+  }
+}
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernelConfigTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernelConfigTest.java
new file mode 100644
index 00000000000..765101359f6
--- /dev/null
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernelConfigTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.kernel.controller;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.msq.indexing.destination.DurableStorageMSQDestination;
+import org.apache.druid.msq.indexing.destination.MSQDestination;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Map;
+
+public class ControllerQueryKernelConfigTest
+{
+  @Test
+  public void testBuilder()
+  {
+    int maxRetainedPartitionSketchBytes = 1;
+    int maxConcurrentStages = 2;
+    boolean pipeline = false;
+    boolean durableStorage = true;
+    boolean faultTolerance = true;
+    MSQDestination destination = DurableStorageMSQDestination.instance();
+    String controllerHost = "controllerHost";
+    List<String> workerIds = ImmutableList.of("worker1", "worker2");
+    Map<String, Object> workerContextMap = ImmutableMap.of("foo", "bar");
+
+    final ControllerQueryKernelConfig config1 = new 
ControllerQueryKernelConfig(
+        maxRetainedPartitionSketchBytes,
+        maxConcurrentStages,
+        pipeline,
+        durableStorage,
+        faultTolerance,
+        destination,
+        controllerHost,
+        workerIds,
+        workerContextMap
+    );
+
+    final ControllerQueryKernelConfig config2 = ControllerQueryKernelConfig
+        .builder()
+        .maxRetainedPartitionSketchBytes(maxRetainedPartitionSketchBytes)
+        .maxConcurrentStages(maxConcurrentStages)
+        .pipeline(pipeline)
+        .durableStorage(durableStorage)
+        .faultTolerance(faultTolerance)
+        .destination(destination)
+        .controllerHost(controllerHost)
+        .workerIds(workerIds)
+        .workerContextMap(workerContextMap)
+        .build();
+
+    Assert.assertEquals(config1, config2);
+  }
+
+  @Test
+  public void testEquals()
+  {
+    EqualsVerifier.forClass(ControllerQueryKernelConfig.class)
+                  .usingGetClass()
+                  .verify();
+  }
+}
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
index 3034be39984..ed518afd2ef 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
@@ -59,7 +59,6 @@ import org.apache.druid.msq.indexing.MSQWorkerTask;
 import org.apache.druid.msq.indexing.MSQWorkerTaskLauncher;
 import org.apache.druid.msq.input.InputSpecSlicer;
 import org.apache.druid.msq.input.table.TableInputSpecSlicer;
-import org.apache.druid.msq.kernel.QueryDefinition;
 import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig;
 import org.apache.druid.msq.util.MultiStageQueryContext;
 import org.apache.druid.query.QueryContext;
@@ -269,7 +268,7 @@ public class MSQTestControllerContext implements 
ControllerContext
   };
 
   @Override
-  public ControllerQueryKernelConfig queryKernelConfig(MSQSpec querySpec, 
QueryDefinition queryDef)
+  public ControllerQueryKernelConfig queryKernelConfig(String queryId, MSQSpec 
querySpec)
   {
     return IndexerControllerContext.makeQueryKernelConfig(querySpec, new 
ControllerMemoryParameters(100_000_000));
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to