kfaraz commented on code in PR #16041: URL: https://github.com/apache/druid/pull/16041#discussion_r1554523657
########## indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskContextEnricher.java: ########## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.task; + +/** + * The TaskContextEnricher interface enhances Druid tasks by appending contextual information. + * By infusing tasks with additional context, it aims to improve aspects of task management, + * monitoring, and analysis. This contextual information aids in clarifying the intent and + * specifics of tasks within metrics and reporting systems. + */ +public interface TaskContextEnricher +{ + /** + * Augments a task's context with additional information. This method introduces or updates + * context entries to better describe the task. Such enriched context is pivotal for generating + * detailed task reports and for incorporating as dimensions within metrics reporting. It ensures + * tasks are more accurately represented and managed by providing deeper insights into task execution + * and performance. + * + * @param task The Druid task to be augmented with additional context. This process may either + * supplement existing context entries or introduce new ones, thereby refining the + * task's narrative and operational details. + */ + void enrich(Task task); Review Comment: ```suggestion void enrichContext(Task task); ``` ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java: ########## @@ -697,6 +702,11 @@ private QueryDefinition initializeQueryDefAndState(final Closer closer) MSQControllerTask.isReplaceInputDataSourceTask(task) ); + // propagate the controller's context and tags to the worker task + taskContextOverridesBuilder.put(MultiStageQueryContext.CTX_OF_CONTROLLER, task.getContext()); Review Comment: I am not sure if this is advisable. Why should we pass the entire context of the controller to the worker? When the worker is submitted to the overlord, the `TaskContextEnricher` would be invoked for the worker anyway, right? cc: @cryptoe ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java: ########## @@ -697,6 +702,11 @@ private QueryDefinition initializeQueryDefAndState(final Closer closer) MSQControllerTask.isReplaceInputDataSourceTask(task) ); + // propagate the controller's context and tags to the worker task + taskContextOverridesBuilder.put(MultiStageQueryContext.CTX_OF_CONTROLLER, task.getContext()); + // specifically assign the 'tags' field for enhanced worker task metrics reporting + taskContextOverridesBuilder.put(DruidMetrics.TAGS, task.getContextValue(DruidMetrics.TAGS, new HashMap())); Review Comment: ```suggestion taskContextOverridesBuilder.put(DruidMetrics.TAGS, task.getContextValue(DruidMetrics.TAGS, new HashMap<>())); ``` ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java: ########## @@ -569,11 +570,15 @@ public TaskStatus runTask(final Closer closer) ), stagesReport, countersSnapshot, - resultsReport + resultsReport, + task.getContext() ); context.writeReports( id(), - TaskReport.buildTaskReports(new MSQTaskReport(id(), taskReportPayload)) + TaskReport.buildTaskReports(new MSQTaskReport( + id(), + taskReportPayload + )) Review Comment: Formatting: ```suggestion TaskReport.buildTaskReports( new MSQTaskReport(id(), taskReportPayload) ) ``` ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQTaskReportPayload.java: ########## @@ -38,19 +39,23 @@ public class MSQTaskReportPayload @Nullable private final MSQResultsReport results; + @Nullable + private final Map<String, Object> taskContext; @JsonCreator public MSQTaskReportPayload( @JsonProperty("status") MSQStatusReport status, @JsonProperty("stages") @Nullable MSQStagesReport stages, @JsonProperty("counters") @Nullable CounterSnapshotsTree counters, - @JsonProperty("results") @Nullable MSQResultsReport results + @JsonProperty("results") @Nullable MSQResultsReport results, + @JsonProperty("taskContext") @Nullable final Map<String, Object> taskContext Review Comment: Once you introduce a new report type for containing the taskContext, you can remove this field from this class. ########## indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTaskContextEnricher.java: ########## @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.task; + +public class NoopTaskContextEnricher implements TaskContextEnricher +{ + public static final String TYPE = "default"; Review Comment: Type should be `noop` for this. ########## indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrors.java: ########## @@ -48,7 +49,8 @@ public IngestionStatsAndErrors( @JsonProperty("segmentAvailabilityWaitTimeMs") long segmentAvailabilityWaitTimeMs, @JsonProperty("recordsProcessed") Map<String, Long> recordsProcessed, @Nullable @JsonProperty("segmentsRead") Long segmentsRead, - @Nullable @JsonProperty("segmentsPublished") Long segmentsPublished + @Nullable @JsonProperty("segmentsPublished") Long segmentsPublished, + @Nullable @JsonProperty("taskContext") Map<String, Object> taskContext Review Comment: taskContext should not be a part of this report. You should create a new report type. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
