kfaraz commented on code in PR #16041:
URL: https://github.com/apache/druid/pull/16041#discussion_r1554523657


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskContextEnricher.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+/**
+ * The TaskContextEnricher interface enhances Druid tasks by appending 
contextual information.
+ * By infusing tasks with additional context, it aims to improve aspects of 
task management,
+ * monitoring, and analysis. This contextual information aids in clarifying 
the intent and
+ * specifics of tasks within metrics and reporting systems.
+ */
+public interface TaskContextEnricher
+{
+  /**
+   * Augments a task's context with additional information. This method 
introduces or updates
+   * context entries to better describe the task. Such enriched context is 
pivotal for generating
+   * detailed task reports and for incorporating as dimensions within metrics 
reporting. It ensures
+   * tasks are more accurately represented and managed by providing deeper 
insights into task execution
+   * and performance.
+   *
+   * @param task The Druid task to be augmented with additional context. This 
process may either
+   *             supplement existing context entries or introduce new ones, 
thereby refining the
+   *             task's narrative and operational details.
+   */
+  void enrich(Task task);

Review Comment:
   ```suggestion
     void enrichContext(Task task);
   ```



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -697,6 +702,11 @@ private QueryDefinition initializeQueryDefAndState(final 
Closer closer)
         MSQControllerTask.isReplaceInputDataSourceTask(task)
     );
 
+    // propagate the controller's context and tags to the worker task
+    taskContextOverridesBuilder.put(MultiStageQueryContext.CTX_OF_CONTROLLER, 
task.getContext());

Review Comment:
   I am not sure if this is advisable. Why should we pass the entire context of 
the controller to the worker? When the worker is submitted to the overlord, the 
`TaskContextEnricher` would be invoked for the worker anyway, right?
   
   cc: @cryptoe 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -697,6 +702,11 @@ private QueryDefinition initializeQueryDefAndState(final 
Closer closer)
         MSQControllerTask.isReplaceInputDataSourceTask(task)
     );
 
+    // propagate the controller's context and tags to the worker task
+    taskContextOverridesBuilder.put(MultiStageQueryContext.CTX_OF_CONTROLLER, 
task.getContext());
+    // specifically assign the 'tags' field for enhanced worker task metrics 
reporting
+    taskContextOverridesBuilder.put(DruidMetrics.TAGS, 
task.getContextValue(DruidMetrics.TAGS, new HashMap()));

Review Comment:
   ```suggestion
       taskContextOverridesBuilder.put(DruidMetrics.TAGS, 
task.getContextValue(DruidMetrics.TAGS, new HashMap<>()));
   ```



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -569,11 +570,15 @@ public TaskStatus runTask(final Closer closer)
           ),
           stagesReport,
           countersSnapshot,
-          resultsReport
+          resultsReport,
+          task.getContext()
       );
       context.writeReports(
           id(),
-          TaskReport.buildTaskReports(new MSQTaskReport(id(), 
taskReportPayload))
+          TaskReport.buildTaskReports(new MSQTaskReport(
+              id(),
+              taskReportPayload
+          ))

Review Comment:
   Formatting:
   ```suggestion
             TaskReport.buildTaskReports(
                 new MSQTaskReport(id(), taskReportPayload)
             )
   ```



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQTaskReportPayload.java:
##########
@@ -38,19 +39,23 @@ public class MSQTaskReportPayload
 
   @Nullable
   private final MSQResultsReport results;
+  @Nullable
+  private final Map<String, Object> taskContext;
 
   @JsonCreator
   public MSQTaskReportPayload(
       @JsonProperty("status") MSQStatusReport status,
       @JsonProperty("stages") @Nullable MSQStagesReport stages,
       @JsonProperty("counters") @Nullable CounterSnapshotsTree counters,
-      @JsonProperty("results") @Nullable MSQResultsReport results
+      @JsonProperty("results") @Nullable MSQResultsReport results,
+      @JsonProperty("taskContext") @Nullable final Map<String, Object> 
taskContext

Review Comment:
   Once you introduce a new report type for containing the taskContext, you can 
remove this field from this class.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTaskContextEnricher.java:
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+public class NoopTaskContextEnricher implements TaskContextEnricher
+{
+  public static final String TYPE = "default";

Review Comment:
   Type should be `noop` for this.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrors.java:
##########
@@ -48,7 +49,8 @@ public IngestionStatsAndErrors(
       @JsonProperty("segmentAvailabilityWaitTimeMs") long 
segmentAvailabilityWaitTimeMs,
       @JsonProperty("recordsProcessed") Map<String, Long> recordsProcessed,
       @Nullable @JsonProperty("segmentsRead") Long segmentsRead,
-      @Nullable @JsonProperty("segmentsPublished") Long segmentsPublished
+      @Nullable @JsonProperty("segmentsPublished") Long segmentsPublished,
+      @Nullable @JsonProperty("taskContext") Map<String, Object> taskContext

Review Comment:
   taskContext should not be a part of this report. You should create a new 
report type.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to