This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 2e4d596d825 MSQ: Include worker context maps in WorkOrders. (#17076)
2e4d596d825 is described below
commit 2e4d596d825f46e7471078d69f127d028e8d8d6c
Author: Gian Merlino <[email protected]>
AuthorDate: Tue Sep 17 01:37:21 2024 -0700
MSQ: Include worker context maps in WorkOrders. (#17076)
* MSQ: Include worker context maps in WorkOrders.
This provides a mechanism to send contexts to workers in long-lived,
shared JVMs that are not part of the task system.
* Style, coverage.
---
.../apache/druid/msq/exec/ControllerContext.java | 5 +-
.../org/apache/druid/msq/exec/ControllerImpl.java | 5 +-
.../java/org/apache/druid/msq/exec/WorkerImpl.java | 80 ++++++++++++++------
.../msq/indexing/IndexerControllerContext.java | 78 +++++++++++++------
.../druid/msq/indexing/IndexerWorkerContext.java | 5 +-
.../org/apache/druid/msq/kernel/WorkOrder.java | 70 ++++++++++++++++-
.../kernel/controller/ControllerQueryKernel.java | 3 +-
.../controller/ControllerQueryKernelConfig.java | 62 ++++++++++-----
.../druid/msq/util/MultiStageQueryContext.java | 18 ++---
.../apache/druid/msq/exec/QueryValidatorTest.java | 3 +-
.../org/apache/druid/msq/exec/WorkerImplTest.java | 88 ++++++++++++++++++++++
.../ControllerQueryKernelConfigTest.java | 83 ++++++++++++++++++++
.../druid/msq/test/MSQTestControllerContext.java | 3 +-
13 files changed, 408 insertions(+), 95 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java
index bc449d14120..44b22af3666 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java
@@ -28,7 +28,6 @@ import org.apache.druid.msq.indexing.MSQSpec;
import org.apache.druid.msq.input.InputSpecSlicer;
import org.apache.druid.msq.input.table.SegmentsInputSlice;
import org.apache.druid.msq.input.table.TableInputSpec;
-import org.apache.druid.msq.kernel.QueryDefinition;
import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig;
import org.apache.druid.msq.querykit.QueryKit;
import org.apache.druid.msq.util.MultiStageQueryContext;
@@ -43,7 +42,7 @@ public interface ControllerContext
/**
* Configuration for {@link
org.apache.druid.msq.kernel.controller.ControllerQueryKernel}.
*/
- ControllerQueryKernelConfig queryKernelConfig(MSQSpec querySpec,
QueryDefinition queryDef);
+ ControllerQueryKernelConfig queryKernelConfig(String queryId, MSQSpec
querySpec);
/**
* Callback from the controller implementation to "register" the controller.
Used in the indexing task implementation
@@ -88,7 +87,7 @@ public interface ControllerContext
*
* @param queryId query ID
* @param querySpec query spec
- * @param queryKernelConfig config from {@link
#queryKernelConfig(MSQSpec, QueryDefinition)}
+ * @param queryKernelConfig config from {@link
#queryKernelConfig(String, MSQSpec)}
* @param workerFailureListener listener that receives callbacks when
workers fail
*/
WorkerManager newWorkerManager(
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index 2a29d40b9fe..8eda56ad857 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -562,8 +562,8 @@ public class ControllerImpl implements Controller
private QueryDefinition initializeQueryDefAndState(final Closer closer)
{
this.selfDruidNode = context.selfNode();
- this.netClient = new
ExceptionWrappingWorkerClient(context.newWorkerClient());
- closer.register(netClient);
+ this.netClient = closer.register(new
ExceptionWrappingWorkerClient(context.newWorkerClient()));
+ this.queryKernelConfig = context.queryKernelConfig(queryId, querySpec);
final QueryContext queryContext = querySpec.getQuery().context();
final QueryDefinition queryDef = makeQueryDefinition(
@@ -594,7 +594,6 @@ public class ControllerImpl implements Controller
QueryValidator.validateQueryDef(queryDef);
queryDefRef.set(queryDef);
- queryKernelConfig = context.queryKernelConfig(querySpec, queryDef);
workerManager = context.newWorkerManager(
queryId,
querySpec,
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
index 74e3850c6e9..702302f7ea1 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
@@ -48,7 +48,6 @@ import org.apache.druid.msq.counters.CounterSnapshotsTree;
import org.apache.druid.msq.counters.CounterTracker;
import org.apache.druid.msq.indexing.InputChannelFactory;
import org.apache.druid.msq.indexing.MSQWorkerTask;
-import org.apache.druid.msq.indexing.destination.MSQSelectDestination;
import org.apache.druid.msq.indexing.error.CanceledFault;
import org.apache.druid.msq.indexing.error.CannotParseExternalDataFault;
import org.apache.druid.msq.indexing.error.MSQErrorReport;
@@ -388,7 +387,6 @@ public class WorkerImpl implements Worker
final InputChannelFactory inputChannelFactory =
makeBaseInputChannelFactory(workOrder, controllerClient,
kernelHolder.processorCloser);
- final QueryContext queryContext = task != null ?
QueryContext.of(task.getContext()) : QueryContext.empty();
final boolean includeAllCounters = context.includeAllCounters();
final RunWorkOrder runWorkOrder = new RunWorkOrder(
workOrder,
@@ -402,8 +400,8 @@ public class WorkerImpl implements Worker
context,
frameContext,
makeRunWorkOrderListener(workOrder, controllerClient,
criticalWarningCodes, maxVerboseParseExceptions),
- MultiStageQueryContext.isReindex(queryContext),
- MultiStageQueryContext.removeNullBytes(queryContext)
+ MultiStageQueryContext.isReindex(workOrder.getWorkerContext()),
+ MultiStageQueryContext.removeNullBytes(workOrder.getWorkerContext())
);
// Set up processorCloser (called when processing is done).
@@ -560,6 +558,13 @@ public class WorkerImpl implements Worker
return getOrCreateStageOutputHolder(stageId,
partitionNumber).readRemotelyFrom(offset);
}
+ /**
+ * Accept a new {@link WorkOrder} for execution.
+ *
+ * For backwards-compatibility purposes, this method populates {@link
WorkOrder#getOutputChannelMode()}
+ * and {@link WorkOrder#getWorkerContext()} if the controller did not set
them. (They are there for newer controllers,
+ * but not older ones.)
+ */
@Override
public void postWorkOrder(final WorkOrder workOrder)
{
@@ -577,28 +582,11 @@ public class WorkerImpl implements Worker
);
}
- final OutputChannelMode outputChannelMode;
-
- // This stack of conditions can be removed once we can rely on
OutputChannelMode always being in the WorkOrder.
- // (It will be there for newer controllers; this is a
backwards-compatibility thing.)
- if (workOrder.hasOutputChannelMode()) {
- outputChannelMode = workOrder.getOutputChannelMode();
- } else {
- final MSQSelectDestination selectDestination =
- task != null
- ?
MultiStageQueryContext.getSelectDestination(QueryContext.of(task.getContext()))
- : MSQSelectDestination.TASKREPORT;
-
- outputChannelMode = ControllerQueryKernelUtils.getOutputChannelMode(
- workOrder.getQueryDefinition(),
- workOrder.getStageNumber(),
- selectDestination,
- task != null &&
MultiStageQueryContext.isDurableStorageEnabled(QueryContext.of(task.getContext())),
- false
- );
- }
+ final WorkOrder workOrderToUse = makeWorkOrderToUse(
+ workOrder,
+ task != null && task.getContext() != null ?
QueryContext.of(task.getContext()) : QueryContext.empty()
+ );
- final WorkOrder workOrderToUse =
workOrder.withOutputChannelMode(outputChannelMode);
kernelManipulationQueue.add(
kernelHolders ->
kernelHolders.addKernel(WorkerStageKernel.create(workOrderToUse))
@@ -1009,6 +997,48 @@ public class WorkerImpl implements Worker
);
}
+ /**
+ * Returns a work order based on the provided "originalWorkOrder", but where
{@link WorkOrder#hasOutputChannelMode()}
+ * and {@link WorkOrder#hasWorkerContext()} are both true. If the original
work order didn't have those fields, they
+ * are populated from the "taskContext". Otherwise the "taskContext" is
ignored.
+ *
+ * This method can be removed once we can rely on these fields always being
set in the WorkOrder.
+ * (They will be there for newer controllers; this is a
backwards-compatibility method.)
+ *
+ * @param originalWorkOrder work order from controller
+ * @param taskContext task context
+ */
+ static WorkOrder makeWorkOrderToUse(final WorkOrder originalWorkOrder,
@Nullable final QueryContext taskContext)
+ {
+ // This condition can be removed once we can rely on QueryContext always
being in the WorkOrder.
+ // (It will be there for newer controllers; this is a
backwards-compatibility thing.)
+ final QueryContext queryContext;
+ if (originalWorkOrder.hasWorkerContext()) {
+ queryContext = originalWorkOrder.getWorkerContext();
+ } else if (taskContext != null) {
+ queryContext = taskContext;
+ } else {
+ queryContext = QueryContext.empty();
+ }
+
+ // This stack of conditions can be removed once we can rely on
OutputChannelMode always being in the WorkOrder.
+ // (It will be there for newer controllers; this is a
backwards-compatibility thing.)
+ final OutputChannelMode outputChannelMode;
+ if (originalWorkOrder.hasOutputChannelMode()) {
+ outputChannelMode = originalWorkOrder.getOutputChannelMode();
+ } else {
+ outputChannelMode = ControllerQueryKernelUtils.getOutputChannelMode(
+ originalWorkOrder.getQueryDefinition(),
+ originalWorkOrder.getStageNumber(),
+ MultiStageQueryContext.getSelectDestination(queryContext),
+ MultiStageQueryContext.isDurableStorageEnabled(queryContext),
+ false
+ );
+ }
+
+ return
originalWorkOrder.withWorkerContext(queryContext).withOutputChannelMode(outputChannelMode);
+ }
+
/**
* Log (at DEBUG level) a string explaining the status of all work assigned
to this worker.
*/
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
index 42808f64742..589b17d632b 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
@@ -46,7 +46,7 @@ import org.apache.druid.msq.indexing.error.MSQWarnings;
import org.apache.druid.msq.indexing.error.UnknownFault;
import org.apache.druid.msq.input.InputSpecSlicer;
import org.apache.druid.msq.input.table.TableInputSpecSlicer;
-import org.apache.druid.msq.kernel.QueryDefinition;
+import org.apache.druid.msq.kernel.WorkOrder;
import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.DruidMetrics;
@@ -66,6 +66,8 @@ import java.util.concurrent.TimeUnit;
*/
public class IndexerControllerContext implements ControllerContext
{
+ public static final int DEFAULT_MAX_CONCURRENT_STAGES = 1;
+
private static final Logger log = new Logger(IndexerControllerContext.class);
private final MSQControllerTask task;
@@ -96,21 +98,21 @@ public class IndexerControllerContext implements
ControllerContext
@Override
public ControllerQueryKernelConfig queryKernelConfig(
- final MSQSpec querySpec,
- final QueryDefinition queryDef
+ final String queryId,
+ final MSQSpec querySpec
)
{
final ControllerMemoryParameters memoryParameters =
ControllerMemoryParameters.createProductionInstance(
memoryIntrospector,
- queryDef.getFinalStageDefinition().getMaxWorkerCount()
+ querySpec.getTuningConfig().getMaxNumWorkers()
);
final ControllerQueryKernelConfig config =
makeQueryKernelConfig(querySpec, memoryParameters);
log.debug(
"Query[%s] using %s[%s], %s[%s], %s[%s].",
- queryDef.getQueryId(),
+ queryId,
MultiStageQueryContext.CTX_DURABLE_SHUFFLE_STORAGE,
config.isDurableStorage(),
MultiStageQueryContext.CTX_FAULT_TOLERANCE,
@@ -210,7 +212,7 @@ public class IndexerControllerContext implements
ControllerContext
}
/**
- * Helper method for {@link #queryKernelConfig(MSQSpec, QueryDefinition)}.
Also used in tests.
+ * Helper method for {@link #queryKernelConfig(String, MSQSpec)}. Also used
in tests.
*/
public static ControllerQueryKernelConfig makeQueryKernelConfig(
final MSQSpec querySpec,
@@ -218,7 +220,8 @@ public class IndexerControllerContext implements
ControllerContext
)
{
final QueryContext queryContext = querySpec.getQuery().context();
- final int maxConcurrentStages =
MultiStageQueryContext.getMaxConcurrentStages(queryContext);
+ final int maxConcurrentStages =
+ MultiStageQueryContext.getMaxConcurrentStagesWithDefault(queryContext,
DEFAULT_MAX_CONCURRENT_STAGES);
final boolean isFaultToleranceEnabled =
MultiStageQueryContext.isFaultToleranceEnabled(queryContext);
final boolean isDurableStorageEnabled;
@@ -256,9 +259,44 @@ public class IndexerControllerContext implements
ControllerContext
.destination(querySpec.getDestination())
.maxConcurrentStages(maxConcurrentStages)
.maxRetainedPartitionSketchBytes(memoryParameters.getPartitionStatisticsMaxRetainedBytes())
+ .workerContextMap(makeWorkerContextMap(querySpec,
isDurableStorageEnabled, maxConcurrentStages))
.build();
}
+ /**
+ * Helper method for {@link #makeQueryKernelConfig} and {@link
#makeTaskContext}. Makes the worker context map,
+ * i.e., the map that will become {@link WorkOrder#getWorkerContext()}.
+ */
+ public static Map<String, Object> makeWorkerContextMap(
+ final MSQSpec querySpec,
+ final boolean durableStorageEnabled,
+ final int maxConcurrentStages
+ )
+ {
+ final QueryContext queryContext = querySpec.getQuery().context();
+ final long maxParseExceptions =
MultiStageQueryContext.getMaxParseExceptions(queryContext);
+ final boolean removeNullBytes =
MultiStageQueryContext.removeNullBytes(queryContext);
+ final boolean includeAllCounters =
MultiStageQueryContext.getIncludeAllCounters(queryContext);
+ final ImmutableMap.Builder<String, Object> builder =
ImmutableMap.builder();
+
+ builder
+ .put(MultiStageQueryContext.CTX_DURABLE_SHUFFLE_STORAGE,
durableStorageEnabled)
+ .put(MSQWarnings.CTX_MAX_PARSE_EXCEPTIONS_ALLOWED, maxParseExceptions)
+ .put(MultiStageQueryContext.CTX_IS_REINDEX,
MSQControllerTask.isReplaceInputDataSourceTask(querySpec))
+ .put(MultiStageQueryContext.CTX_MAX_CONCURRENT_STAGES,
maxConcurrentStages)
+ .put(MultiStageQueryContext.CTX_REMOVE_NULL_BYTES, removeNullBytes)
+ .put(MultiStageQueryContext.CTX_INCLUDE_ALL_COUNTERS,
includeAllCounters);
+
+ if (querySpec.getDestination().toSelectDestination() != null) {
+ builder.put(
+ MultiStageQueryContext.CTX_SELECT_DESTINATION,
+ querySpec.getDestination().toSelectDestination().getName()
+ );
+ }
+
+ return builder.build();
+ }
+
/**
* Helper method for {@link #newWorkerManager}, split out to be used in
tests.
*
@@ -271,17 +309,16 @@ public class IndexerControllerContext implements
ControllerContext
)
{
final ImmutableMap.Builder<String, Object> taskContextOverridesBuilder =
ImmutableMap.builder();
- final long maxParseExceptions =
MultiStageQueryContext.getMaxParseExceptions(querySpec.getQuery().context());
- final boolean removeNullBytes =
MultiStageQueryContext.removeNullBytes(querySpec.getQuery().context());
- final boolean includeAllCounters =
MultiStageQueryContext.getIncludeAllCounters(querySpec.getQuery().context());
- taskContextOverridesBuilder
- .put(MultiStageQueryContext.CTX_DURABLE_SHUFFLE_STORAGE,
queryKernelConfig.isDurableStorage())
- .put(MSQWarnings.CTX_MAX_PARSE_EXCEPTIONS_ALLOWED, maxParseExceptions)
- .put(MultiStageQueryContext.CTX_IS_REINDEX,
MSQControllerTask.isReplaceInputDataSourceTask(querySpec))
- .put(MultiStageQueryContext.CTX_MAX_CONCURRENT_STAGES,
queryKernelConfig.getMaxConcurrentStages())
- .put(MultiStageQueryContext.CTX_REMOVE_NULL_BYTES, removeNullBytes)
- .put(MultiStageQueryContext.CTX_INCLUDE_ALL_COUNTERS,
includeAllCounters);
+ // Put worker context into the task context. That way, workers can get
these context keys either from
+ // WorkOrder#getContext or Task#getContext.
+ taskContextOverridesBuilder.putAll(
+ makeWorkerContextMap(
+ querySpec,
+ queryKernelConfig.isDurableStorage(),
+ queryKernelConfig.getMaxConcurrentStages()
+ )
+ );
// Put the lookup loading info in the task context to facilitate selective
loading of lookups.
if (controllerTaskContext.get(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE)
!= null) {
@@ -297,13 +334,6 @@ public class IndexerControllerContext implements
ControllerContext
);
}
- if (querySpec.getDestination().toSelectDestination() != null) {
- taskContextOverridesBuilder.put(
- MultiStageQueryContext.CTX_SELECT_DESTINATION,
- querySpec.getDestination().toSelectDestination().getName()
- );
- }
-
// propagate the controller's tags to the worker task for enhanced metrics
reporting
@SuppressWarnings("unchecked")
Map<String, Object> tags = (Map<String, Object>)
controllerTaskContext.get(DruidMetrics.TAGS);
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
index 2a7d91c40af..fbb0bff9556 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
@@ -116,7 +116,10 @@ public class IndexerWorkerContext implements WorkerContext
this.dataServerQueryHandlerFactory = dataServerQueryHandlerFactory;
final QueryContext queryContext = QueryContext.of(task.getContext());
- this.maxConcurrentStages =
MultiStageQueryContext.getMaxConcurrentStages(queryContext);
+ this.maxConcurrentStages =
MultiStageQueryContext.getMaxConcurrentStagesWithDefault(
+ queryContext,
+ IndexerControllerContext.DEFAULT_MAX_CONCURRENT_STAGES
+ );
this.includeAllCounters =
MultiStageQueryContext.getIncludeAllCounters(queryContext);
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/WorkOrder.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/WorkOrder.java
index 0c857870210..2a45605826b 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/WorkOrder.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/WorkOrder.java
@@ -26,9 +26,11 @@ import com.google.common.base.Preconditions;
import org.apache.druid.msq.exec.ControllerClient;
import org.apache.druid.msq.exec.OutputChannelMode;
import org.apache.druid.msq.input.InputSlice;
+import org.apache.druid.query.QueryContext;
import javax.annotation.Nullable;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
/**
@@ -51,9 +53,18 @@ public class WorkOrder
@Nullable
private final List<String> workerIds;
+ /**
+ * Always non-null for newer controllers. This is marked nullable for
backwards-compatibility reasons.
+ */
@Nullable
private final OutputChannelMode outputChannelMode;
+ /**
+ * Always non-null for newer controllers. This is marked nullable for
backwards-compatibility reasons.
+ */
+ @Nullable
+ private final QueryContext workerContext;
+
@JsonCreator
@SuppressWarnings("rawtypes")
public WorkOrder(
@@ -63,7 +74,8 @@ public class WorkOrder
@JsonProperty("input") final List<InputSlice> workerInputs,
@JsonProperty("extra") @Nullable final ExtraInfoHolder extraInfoHolder,
@JsonProperty("workers") @Nullable final List<String> workerIds,
- @JsonProperty("output") @Nullable final OutputChannelMode
outputChannelMode
+ @JsonProperty("output") @Nullable final OutputChannelMode
outputChannelMode,
+ @JsonProperty("context") @Nullable final Map<String, Object>
workerContext
)
{
this.queryDefinition = Preconditions.checkNotNull(queryDefinition,
"queryDefinition");
@@ -73,6 +85,7 @@ public class WorkOrder
this.extraInfoHolder = extraInfoHolder;
this.workerIds = workerIds;
this.outputChannelMode = outputChannelMode;
+ this.workerContext = workerContext != null ?
QueryContext.of(workerContext) : null;
}
@JsonProperty("query")
@@ -124,6 +137,10 @@ public class WorkOrder
return outputChannelMode != null;
}
+ /**
+ * Retrieves the output channel mode set by the controller. Null means the
controller didn't set it, which means
+ * we're dealing with an older controller.
+ */
@Nullable
@JsonProperty("output")
@JsonInclude(JsonInclude.Include.NON_NULL)
@@ -132,6 +149,29 @@ public class WorkOrder
return outputChannelMode;
}
+ public boolean hasWorkerContext()
+ {
+ return workerContext != null;
+ }
+
+ /**
+ * Retrieves the query context set by the controller. Null means the
controller didn't set it, which means
+ * we're dealing with an older controller.
+ */
+ @Nullable
+ public QueryContext getWorkerContext()
+ {
+ return workerContext;
+ }
+
+ @Nullable
+ @JsonProperty("context")
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public Map<String, Object> getContextForSerialization()
+ {
+ return workerContext != null ? workerContext.asMap() : null;
+ }
+
@Nullable
public Object getExtraInfo()
{
@@ -155,7 +195,26 @@ public class WorkOrder
workerInputs,
extraInfoHolder,
workerIds,
- newOutputChannelMode
+ newOutputChannelMode,
+ workerContext != null ? workerContext.asMap() : null
+ );
+ }
+ }
+
+ public WorkOrder withWorkerContext(final QueryContext newContext)
+ {
+ if (Objects.equals(newContext, this.workerContext)) {
+ return this;
+ } else {
+ return new WorkOrder(
+ queryDefinition,
+ stageNumber,
+ workerNumber,
+ workerInputs,
+ extraInfoHolder,
+ workerIds,
+ outputChannelMode,
+ newContext.asMap()
);
}
}
@@ -176,7 +235,8 @@ public class WorkOrder
&& Objects.equals(workerInputs, workOrder.workerInputs)
&& Objects.equals(extraInfoHolder, workOrder.extraInfoHolder)
&& Objects.equals(workerIds, workOrder.workerIds)
- && Objects.equals(outputChannelMode, workOrder.outputChannelMode);
+ && Objects.equals(outputChannelMode, workOrder.outputChannelMode)
+ && Objects.equals(workerContext, workOrder.workerContext);
}
@Override
@@ -189,7 +249,8 @@ public class WorkOrder
workerInputs,
extraInfoHolder,
workerIds,
- outputChannelMode
+ outputChannelMode,
+ workerContext
);
}
@@ -204,6 +265,7 @@ public class WorkOrder
", extraInfoHolder=" + extraInfoHolder +
", workerIds=" + workerIds +
", outputChannelMode=" + outputChannelMode +
+ ", context=" + workerContext +
'}';
}
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java
index b01091f9ad7..62a13326909 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java
@@ -302,7 +302,8 @@ public class ControllerQueryKernel
workerInputs.inputsForWorker(workerNumber),
extraInfoHolder,
config.getWorkerIds(),
- outputChannelMode
+ outputChannelMode,
+ config.getWorkerContextMap()
);
QueryValidator.validateWorkOrder(workOrder);
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernelConfig.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernelConfig.java
index 5c754aedd4f..f7516c63c92 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernelConfig.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernelConfig.java
@@ -21,9 +21,12 @@ package org.apache.druid.msq.kernel.controller;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.msq.indexing.destination.MSQDestination;
+import org.apache.druid.msq.kernel.WorkOrder;
import javax.annotation.Nullable;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
/**
@@ -37,22 +40,22 @@ public class ControllerQueryKernelConfig
private final boolean durableStorage;
private final boolean faultTolerance;
private final MSQDestination destination;
-
@Nullable
- private final String controllerId;
-
+ private final String controllerHost;
@Nullable
private final List<String> workerIds;
+ private final Map<String, Object> workerContextMap;
- private ControllerQueryKernelConfig(
+ ControllerQueryKernelConfig(
int maxRetainedPartitionSketchBytes,
int maxConcurrentStages,
boolean pipeline,
boolean durableStorage,
boolean faultTolerance,
MSQDestination destination,
- @Nullable String controllerId,
- @Nullable List<String> workerIds
+ @Nullable String controllerHost,
+ @Nullable List<String> workerIds,
+ Map<String, Object> workerContextMap
)
{
if (maxRetainedPartitionSketchBytes <= 0) {
@@ -85,8 +88,9 @@ public class ControllerQueryKernelConfig
this.durableStorage = durableStorage;
this.faultTolerance = faultTolerance;
this.destination = destination;
- this.controllerId = controllerId;
+ this.controllerHost = controllerHost;
this.workerIds = workerIds;
+ this.workerContextMap = workerContextMap;
}
public static Builder builder()
@@ -130,6 +134,14 @@ public class ControllerQueryKernelConfig
return workerIds;
}
+ /**
+ * Map to include in {@link WorkOrder}, as {@link
WorkOrder#getWorkerContext()}.
+ */
+ public Map<String, Object> getWorkerContextMap()
+ {
+ return workerContextMap;
+ }
+
@Override
public boolean equals(Object o)
{
@@ -145,8 +157,10 @@ public class ControllerQueryKernelConfig
&& pipeline == that.pipeline
&& durableStorage == that.durableStorage
&& faultTolerance == that.faultTolerance
- && Objects.equals(controllerId, that.controllerId)
- && Objects.equals(workerIds, that.workerIds);
+ && Objects.equals(destination, that.destination)
+ && Objects.equals(controllerHost, that.controllerHost)
+ && Objects.equals(workerIds, that.workerIds)
+ && Objects.equals(workerContextMap, that.workerContextMap);
}
@Override
@@ -158,8 +172,10 @@ public class ControllerQueryKernelConfig
pipeline,
durableStorage,
faultTolerance,
- controllerId,
- workerIds
+ destination,
+ controllerHost,
+ workerIds,
+ workerContextMap
);
}
@@ -171,9 +187,11 @@ public class ControllerQueryKernelConfig
", maxConcurrentStages=" + maxConcurrentStages +
", pipeline=" + pipeline +
", durableStorage=" + durableStorage +
- ", faultTolerant=" + faultTolerance +
- ", controllerId='" + controllerId + '\'' +
+ ", faultTolerance=" + faultTolerance +
+ ", destination=" + destination +
+ ", controllerHost='" + controllerHost + '\'' +
", workerIds=" + workerIds +
+ ", workerContextMap=" + workerContextMap +
'}';
}
@@ -185,8 +203,9 @@ public class ControllerQueryKernelConfig
private boolean durableStorage;
private boolean faultTolerant;
private MSQDestination destination;
- private String controllerId;
+ private String controllerHost;
private List<String> workerIds;
+ private Map<String, Object> workerContextMap = Collections.emptyMap();
/**
* Use {@link #builder()}.
@@ -231,9 +250,9 @@ public class ControllerQueryKernelConfig
return this;
}
- public Builder controllerId(final String controllerId)
+ public Builder controllerHost(final String controllerHost)
{
- this.controllerId = controllerId;
+ this.controllerHost = controllerHost;
return this;
}
@@ -243,6 +262,12 @@ public class ControllerQueryKernelConfig
return this;
}
+ public Builder workerContextMap(final Map<String, Object> workerContextMap)
+ {
+ this.workerContextMap = workerContextMap;
+ return this;
+ }
+
public ControllerQueryKernelConfig build()
{
return new ControllerQueryKernelConfig(
@@ -252,8 +277,9 @@ public class ControllerQueryKernelConfig
durableStorage,
faultTolerant,
destination,
- controllerId,
- workerIds
+ controllerHost,
+ workerIds,
+ workerContextMap
);
}
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
index 63601c907a2..4ed98dca594 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
@@ -119,7 +119,6 @@ public class MultiStageQueryContext
public static final SegmentSource DEFAULT_INCLUDE_SEGMENT_SOURCE =
SegmentSource.NONE;
public static final String CTX_MAX_CONCURRENT_STAGES = "maxConcurrentStages";
- public static final int DEFAULT_MAX_CONCURRENT_STAGES = 1;
public static final String CTX_DURABLE_SHUFFLE_STORAGE =
"durableShuffleStorage";
private static final boolean DEFAULT_DURABLE_SHUFFLE_STORAGE = false;
public static final String CTX_SELECT_DESTINATION = "selectDestination";
@@ -206,11 +205,14 @@ public class MultiStageQueryContext
);
}
- public static int getMaxConcurrentStages(final QueryContext queryContext)
+ public static int getMaxConcurrentStagesWithDefault(
+ final QueryContext queryContext,
+ final int defaultMaxConcurrentStages
+ )
{
return queryContext.getInt(
CTX_MAX_CONCURRENT_STAGES,
- DEFAULT_MAX_CONCURRENT_STAGES
+ defaultMaxConcurrentStages
);
}
@@ -336,16 +338,6 @@ public class MultiStageQueryContext
);
}
- @Nullable
- public static MSQSelectDestination getSelectDestinationOrNull(final
QueryContext queryContext)
- {
- return QueryContexts.getAsEnum(
- CTX_SELECT_DESTINATION,
- queryContext.getString(CTX_SELECT_DESTINATION),
- MSQSelectDestination.class
- );
- }
-
public static int getRowsInMemory(final QueryContext queryContext)
{
return queryContext.getInt(CTX_ROWS_IN_MEMORY, DEFAULT_ROWS_IN_MEMORY);
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/QueryValidatorTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/QueryValidatorTest.java
index d7364124483..c1d1030fb08 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/QueryValidatorTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/QueryValidatorTest.java
@@ -108,6 +108,7 @@ public class QueryValidatorTest
Collections.singletonList(() -> inputFiles), // Slice with a large
number of inputFiles
null,
null,
+ null,
null
);
@@ -125,7 +126,7 @@ public class QueryValidatorTest
QueryValidator.validateWorkOrder(workOrder);
}
- private static QueryDefinition createQueryDefinition(int numColumns, int
numWorkers)
+ public static QueryDefinition createQueryDefinition(int numColumns, int
numWorkers)
{
QueryDefinitionBuilder builder =
QueryDefinition.builder(UUID.randomUUID().toString());
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerImplTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerImplTest.java
new file mode 100644
index 00000000000..32cd36d0998
--- /dev/null
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerImplTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.exec;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.msq.kernel.WorkOrder;
+import org.apache.druid.msq.util.MultiStageQueryContext;
+import org.apache.druid.query.QueryContext;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Map;
+
+public class WorkerImplTest
+{
+ @Test
+ public void test_makeWorkOrderToUse_nothingMissing()
+ {
+ final WorkOrder workOrder = new WorkOrder(
+ QueryValidatorTest.createQueryDefinition(10, 2),
+ 0,
+ 0,
+ Collections.singletonList(() -> 1),
+ null,
+ null,
+ OutputChannelMode.MEMORY,
+ ImmutableMap.of("foo", "bar")
+ );
+
+ Assert.assertSame(
+ workOrder,
+ WorkerImpl.makeWorkOrderToUse(
+ workOrder,
+ QueryContext.of(ImmutableMap.of("foo", "baz")) /* Conflicts with
workOrder context; should be ignored */
+ )
+ );
+ }
+
+ @Test
+ public void
test_makeWorkOrderToUse_missingOutputChannelModeAndWorkerContext()
+ {
+ final Map<String, Object> taskContext =
+ ImmutableMap.of("foo", "bar",
MultiStageQueryContext.CTX_DURABLE_SHUFFLE_STORAGE, true);
+
+ final WorkOrder workOrder = new WorkOrder(
+ QueryValidatorTest.createQueryDefinition(10, 2),
+ 1,
+ 2,
+ Collections.singletonList(() -> 1),
+ null,
+ null,
+ null,
+ null
+ );
+
+ Assert.assertEquals(
+ new WorkOrder(
+ workOrder.getQueryDefinition(),
+ workOrder.getStageNumber(),
+ workOrder.getWorkerNumber(),
+ workOrder.getInputs(),
+ null,
+ null,
+ OutputChannelMode.DURABLE_STORAGE_INTERMEDIATE,
+ taskContext
+ ),
+ WorkerImpl.makeWorkOrderToUse(workOrder, QueryContext.of(taskContext))
+ );
+ }
+}
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernelConfigTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernelConfigTest.java
new file mode 100644
index 00000000000..765101359f6
--- /dev/null
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernelConfigTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.kernel.controller;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.msq.indexing.destination.DurableStorageMSQDestination;
+import org.apache.druid.msq.indexing.destination.MSQDestination;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Map;
+
+public class ControllerQueryKernelConfigTest
+{
+ @Test
+ public void testBuilder()
+ {
+ int maxRetainedPartitionSketchBytes = 1;
+ int maxConcurrentStages = 2;
+ boolean pipeline = false;
+ boolean durableStorage = true;
+ boolean faultTolerance = true;
+ MSQDestination destination = DurableStorageMSQDestination.instance();
+ String controllerHost = "controllerHost";
+ List<String> workerIds = ImmutableList.of("worker1", "worker2");
+ Map<String, Object> workerContextMap = ImmutableMap.of("foo", "bar");
+
+ final ControllerQueryKernelConfig config1 = new
ControllerQueryKernelConfig(
+ maxRetainedPartitionSketchBytes,
+ maxConcurrentStages,
+ pipeline,
+ durableStorage,
+ faultTolerance,
+ destination,
+ controllerHost,
+ workerIds,
+ workerContextMap
+ );
+
+ final ControllerQueryKernelConfig config2 = ControllerQueryKernelConfig
+ .builder()
+ .maxRetainedPartitionSketchBytes(maxRetainedPartitionSketchBytes)
+ .maxConcurrentStages(maxConcurrentStages)
+ .pipeline(pipeline)
+ .durableStorage(durableStorage)
+ .faultTolerance(faultTolerance)
+ .destination(destination)
+ .controllerHost(controllerHost)
+ .workerIds(workerIds)
+ .workerContextMap(workerContextMap)
+ .build();
+
+ Assert.assertEquals(config1, config2);
+ }
+
+ @Test
+ public void testEquals()
+ {
+ EqualsVerifier.forClass(ControllerQueryKernelConfig.class)
+ .usingGetClass()
+ .verify();
+ }
+}
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
index 3034be39984..ed518afd2ef 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
@@ -59,7 +59,6 @@ import org.apache.druid.msq.indexing.MSQWorkerTask;
import org.apache.druid.msq.indexing.MSQWorkerTaskLauncher;
import org.apache.druid.msq.input.InputSpecSlicer;
import org.apache.druid.msq.input.table.TableInputSpecSlicer;
-import org.apache.druid.msq.kernel.QueryDefinition;
import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.QueryContext;
@@ -269,7 +268,7 @@ public class MSQTestControllerContext implements
ControllerContext
};
@Override
- public ControllerQueryKernelConfig queryKernelConfig(MSQSpec querySpec,
QueryDefinition queryDef)
+ public ControllerQueryKernelConfig queryKernelConfig(String queryId, MSQSpec
querySpec)
{
return IndexerControllerContext.makeQueryKernelConfig(querySpec, new
ControllerMemoryParameters(100_000_000));
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]