This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new da9feb44309 Introduce TaskContextReport for reporting task context  
(#16041)
da9feb44309 is described below

commit da9feb4430929e79ac4d9c4003e11120913a3f09
Author: YongGang <[email protected]>
AuthorDate: Thu Apr 11 20:27:49 2024 -0700

    Introduce TaskContextReport for reporting task context  (#16041)
    
    Changes:
    - Add `TaskContextEnricher` interface to improve task management and 
monitoring
    - Invoke `enrichContext` in `TaskQueue.add()` whenever a new task is 
submitted to the Overlord
    - Add `TaskContextReport` to write out task context information in reports
---
 docs/ingestion/tasks.md                            |  8 +++
 docs/operations/metrics.md                         |  2 +-
 .../org/apache/druid/msq/exec/ControllerImpl.java  | 15 +++-
 .../druid/msq/util/MultiStageQueryContext.java     |  7 ++
 .../druid/indexing/common/TaskContextReport.java   | 84 ++++++++++++++++++++++
 .../apache/druid/indexing/common/TaskReport.java   |  3 +-
 .../common/task/AbstractBatchIndexTask.java        |  7 +-
 .../task/AppenderatorDriverRealtimeIndexTask.java  |  4 +-
 .../common/task/NoopTaskContextEnricher.java       | 30 ++++++++
 .../indexing/common/task/TaskContextEnricher.java  | 45 ++++++++++++
 .../apache/druid/indexing/overlord/TaskMaster.java |  7 +-
 .../apache/druid/indexing/overlord/TaskQueue.java  |  8 ++-
 .../SeekableStreamIndexTaskRunner.java             |  4 +-
 .../indexing/common/task/TaskReportSerdeTest.java  | 18 ++++-
 .../concurrent/ConcurrentReplaceAndAppendTest.java |  4 +-
 .../druid/indexing/overlord/TaskLifecycleTest.java | 14 +++-
 .../indexing/overlord/TaskLockConfigTest.java      |  4 +-
 .../indexing/overlord/TaskQueueScaleTest.java      |  4 +-
 .../druid/indexing/overlord/TaskQueueTest.java     | 13 ++--
 .../druid/indexing/overlord/http/OverlordTest.java |  4 +-
 .../cases/cluster/Common/dependencies.yaml         |  4 +-
 .../testsEx/indexer/AbstractITBatchIndexTest.java  |  5 ++
 .../druid/testsEx/msq/ITMultiStageQuery.java       | 10 ++-
 .../clients/OverlordResourceTestClient.java        | 14 ++--
 .../clients/msq/MsqOverlordResourceTestClient.java |  6 +-
 .../druid/testing/utils/MsqTestQueryHelper.java    |  7 +-
 .../tests/indexer/AbstractITBatchIndexTest.java    |  6 ++
 .../druid/tests/indexer/ITCompactionTaskTest.java  | 29 ++++----
 .../java/org/apache/druid/cli/CliOverlord.java     | 14 ++++
 .../main/java/org/apache/druid/cli/CliPeon.java    | 17 +++--
 30 files changed, 344 insertions(+), 53 deletions(-)

diff --git a/docs/ingestion/tasks.md b/docs/ingestion/tasks.md
index ab206c75762..aaed12d7f4d 100644
--- a/docs/ingestion/tasks.md
+++ b/docs/ingestion/tasks.md
@@ -89,6 +89,14 @@ An example output is shown below:
       "errorMsg": null
     },
     "type": "ingestionStatsAndErrors"
+  },
+  "taskContext": {
+    "type": "taskContext",
+    "taskId": "compact_twitter_2018-09-24T18:24:23.920Z",
+    "payload": {
+      "forceTimeChunkLock": true,
+      "useLineageBasedSegmentAllocation": true
+    }
   }
 }
 ```
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index bf9d63a93b8..dde1c7f64f8 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -378,7 +378,7 @@ These metrics are for the Druid Coordinator and are reset 
each time the Coordina
 
 |Metric|Description|Dimensions|Normal value|
 |------|-----------|----------|------------|
-| `service/heartbeat` | Metric indicating the service is up. 
`ServiceStatusMonitor` must be enabled. | `leader` on the Overlord and 
Coordinator.<br />`workerVersion`, `category`, `status` on the Middle 
Manager.<br />`taskId`, `groupId`, `taskType`, `dataSource` on the Peon |1|
+| `service/heartbeat` | Metric indicating the service is up. This metric is 
emitted only when `ServiceStatusMonitor` is enabled. | `leader` on the Overlord 
and Coordinator.<br />`workerVersion`, `category`, `status` on the Middle 
Manager.<br />`taskId`, `groupId`, `taskType`, `dataSource`, `tags` on the Peon 
|1|
 
 ### Historical
 
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index e7759bc47b4..63aaf7b4de8 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -69,6 +69,7 @@ import 
org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
 import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
 import org.apache.druid.indexing.common.LockGranularity;
+import org.apache.druid.indexing.common.TaskContextReport;
 import org.apache.druid.indexing.common.TaskLock;
 import org.apache.druid.indexing.common.TaskLockType;
 import org.apache.druid.indexing.common.TaskReport;
@@ -190,6 +191,7 @@ import org.apache.druid.msq.util.MSQFutureUtils;
 import org.apache.druid.msq.util.MultiStageQueryContext;
 import org.apache.druid.msq.util.PassthroughAggregatorFactory;
 import org.apache.druid.msq.util.SqlStatementResourceHelper;
+import org.apache.druid.query.DruidMetrics;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryContext;
 import org.apache.druid.query.aggregation.AggregatorFactory;
@@ -589,7 +591,10 @@ public class ControllerImpl implements Controller
       );
       context.writeReports(
           id(),
-          TaskReport.buildTaskReports(new MSQTaskReport(id(), 
taskReportPayload))
+          TaskReport.buildTaskReports(
+              new MSQTaskReport(id(), taskReportPayload),
+              new TaskContextReport(id(), task.getContext())
+          )
       );
     }
     catch (Throwable e) {
@@ -713,6 +718,14 @@ public class ControllerImpl implements Controller
         MSQControllerTask.isReplaceInputDataSourceTask(task)
     );
 
+    // propagate the controller's context and tags to the worker task
+    taskContextOverridesBuilder.put(MultiStageQueryContext.CTX_OF_CONTROLLER, 
task.getContext());
+    // specifically assign the 'tags' field for enhanced worker task metrics 
reporting
+    Map<String, Object> tags = task.getContextValue(DruidMetrics.TAGS);
+    if (tags != null) {
+      taskContextOverridesBuilder.put(DruidMetrics.TAGS, tags);
+    }
+
     this.workerTaskLauncher = new MSQWorkerTaskLauncher(
         id(),
         task.getDataSource(),
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
index 60734b5b1da..0fd8e4f51f3 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
@@ -143,6 +143,13 @@ public class MultiStageQueryContext
 
   public static final String CTX_IS_REINDEX = "isReindex";
 
+  /**
+   * Key for controller task's context passed to worker tasks.
+   * Facilitates sharing the controller's execution environment
+   * and configurations with its associated worker tasks.
+   */
+  public static final String CTX_OF_CONTROLLER = "controllerCtx";
+
   /**
    * Controls sort order within segments. Normally, this is the same as the 
overall order of the query (from the
    * CLUSTERED BY clause) but it can be overridden.
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskContextReport.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskContextReport.java
new file mode 100644
index 00000000000..743f216908d
--- /dev/null
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskContextReport.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+import java.util.Objects;
+
+public class TaskContextReport implements TaskReport
+{
+  public static final String REPORT_KEY = "taskContext";
+  private final String taskId;
+  @Nullable
+  private final Map<String, Object> taskContext;
+
+  @JsonCreator
+  public TaskContextReport(
+      @JsonProperty("taskId") final String taskId,
+      @JsonProperty("payload") @Nullable final Map<String, Object> taskContext
+  )
+  {
+    this.taskId = taskId;
+    this.taskContext = taskContext;
+  }
+
+  @Override
+  @JsonProperty
+  public String getTaskId()
+  {
+    return taskId;
+  }
+
+  @Override
+  public String getReportKey()
+  {
+    return REPORT_KEY;
+  }
+
+  @Override
+  @JsonProperty
+  public Map<String, Object> getPayload()
+  {
+    return taskContext;
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    TaskContextReport that = (TaskContextReport) o;
+    return Objects.equals(taskId, that.taskId) && Objects.equals(taskContext, 
that.taskContext);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(taskId, taskContext);
+  }
+}
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskReport.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskReport.java
index 86ed562112f..adada4f708d 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskReport.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskReport.java
@@ -35,7 +35,8 @@ import java.util.LinkedHashMap;
         name = IngestionStatsAndErrorsTaskReport.REPORT_KEY,
         value = IngestionStatsAndErrorsTaskReport.class
     ),
-    @JsonSubTypes.Type(name = KillTaskReport.REPORT_KEY, value = 
KillTaskReport.class)
+    @JsonSubTypes.Type(name = KillTaskReport.REPORT_KEY, value = 
KillTaskReport.class),
+    @JsonSubTypes.Type(name = TaskContextReport.REPORT_KEY, value = 
TaskContextReport.class)
 })
 public interface TaskReport
 {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
index 7c3608cd8d5..4486f837376 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
@@ -34,6 +34,7 @@ import org.apache.druid.indexer.IngestionState;
 import org.apache.druid.indexing.common.IngestionStatsAndErrors;
 import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
 import org.apache.druid.indexing.common.LockGranularity;
+import org.apache.druid.indexing.common.TaskContextReport;
 import org.apache.druid.indexing.common.TaskLock;
 import org.apache.druid.indexing.common.TaskLockType;
 import org.apache.druid.indexing.common.TaskReport;
@@ -919,7 +920,8 @@ public abstract class AbstractBatchIndexTask extends 
AbstractTask
                 null,
                 null
             )
-        )
+        ),
+        new TaskContextReport(getId(), getContext())
     );
   }
 
@@ -948,7 +950,8 @@ public abstract class AbstractBatchIndexTask extends 
AbstractTask
                 segmentsRead,
                 segmentsPublished
             )
-        )
+        ),
+        new TaskContextReport(getId(), getContext())
     );
   }
 
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
index 2e4710d43cd..19f8a775b27 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
@@ -45,6 +45,7 @@ import 
org.apache.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
 import org.apache.druid.indexing.common.IngestionStatsAndErrors;
 import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
 import org.apache.druid.indexing.common.LockGranularity;
+import org.apache.druid.indexing.common.TaskContextReport;
 import org.apache.druid.indexing.common.TaskLock;
 import org.apache.druid.indexing.common.TaskLockType;
 import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
@@ -625,7 +626,8 @@ public class AppenderatorDriverRealtimeIndexTask extends 
AbstractTask implements
                 null,
                 null
             )
-        )
+        ),
+        new TaskContextReport(getId(), getContext())
     );
   }
 
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTaskContextEnricher.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTaskContextEnricher.java
new file mode 100644
index 00000000000..3ec25f99514
--- /dev/null
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTaskContextEnricher.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+public class NoopTaskContextEnricher implements TaskContextEnricher
+{
+  public static final String TYPE = "noop";
+
+  @Override
+  public void enrichContext(Task task)
+  {
+  }
+}
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskContextEnricher.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskContextEnricher.java
new file mode 100644
index 00000000000..baa6141e6ae
--- /dev/null
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskContextEnricher.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import org.apache.druid.guice.annotations.UnstableApi;
+
+/**
+ * The TaskContextEnricher interface enhances Druid tasks by appending 
contextual information.
+ * By infusing tasks with additional context, it aims to improve aspects of 
task management,
+ * monitoring, and analysis. This contextual information aids in clarifying 
the intent and
+ * specifics of tasks within metrics and reporting systems.
+ */
+@UnstableApi
+public interface TaskContextEnricher
+{
+  /**
+   * Augments a task's context with additional information. This method 
introduces or updates
+   * context entries to better describe the task. Such enriched context is 
pivotal for generating
+   * detailed task reports and for incorporating as dimensions within metrics 
reporting. It ensures
+   * tasks are more accurately represented and managed by providing deeper 
insights into task execution
+   * and performance.
+   *
+   * @param task The Druid task to be augmented with additional context. This 
process may either
+   *             supplement existing context entries or introduce new ones, 
thereby refining the
+   *             task's narrative and operational details.
+   */
+  void enrichContext(Task task);
+}
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
index 9829e1ffa19..5103f9bd87e 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
@@ -31,6 +31,7 @@ import 
org.apache.druid.indexing.common.actions.SegmentAllocationQueue;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
 import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.common.task.TaskContextEnricher;
 import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
 import org.apache.druid.indexing.overlord.config.DefaultTaskConfig;
 import org.apache.druid.indexing.overlord.config.TaskLockConfig;
@@ -96,7 +97,8 @@ public class TaskMaster implements TaskCountStatsProvider, 
TaskSlotCountStatsPro
       final OverlordDutyExecutor overlordDutyExecutor,
       @IndexingService final DruidLeaderSelector overlordLeaderSelector,
       final SegmentAllocationQueue segmentAllocationQueue,
-      final ObjectMapper mapper
+      final ObjectMapper mapper,
+      final TaskContextEnricher taskContextEnricher
   )
   {
     this.supervisorManager = supervisorManager;
@@ -128,7 +130,8 @@ public class TaskMaster implements TaskCountStatsProvider, 
TaskSlotCountStatsPro
               taskActionClientFactory,
               taskLockbox,
               emitter,
-              mapper
+              mapper,
+              taskContextEnricher
           );
 
           // Sensible order to start stuff:
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
index fb3f68c2394..a143b93e9ef 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
@@ -43,6 +43,7 @@ import org.apache.druid.indexing.common.TaskLock;
 import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
 import org.apache.druid.indexing.common.task.IndexTaskUtils;
 import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.common.task.TaskContextEnricher;
 import org.apache.druid.indexing.common.task.Tasks;
 import 
org.apache.druid.indexing.common.task.batch.MaxAllowedLocksExceededException;
 import 
org.apache.druid.indexing.common.task.batch.parallel.SinglePhaseParallelIndexTaskRunner;
@@ -121,6 +122,7 @@ public class TaskQueue
   private final TaskLockbox taskLockbox;
   private final ServiceEmitter emitter;
   private final ObjectMapper passwordRedactingMapper;
+  private final TaskContextEnricher taskContextEnricher;
 
   private final ReentrantLock giant = new ReentrantLock(true);
   @SuppressWarnings("MismatchedQueryAndUpdateOfCollection")
@@ -165,7 +167,8 @@ public class TaskQueue
       TaskActionClientFactory taskActionClientFactory,
       TaskLockbox taskLockbox,
       ServiceEmitter emitter,
-      ObjectMapper mapper
+      ObjectMapper mapper,
+      TaskContextEnricher taskContextEnricher
   )
   {
     this.lockConfig = Preconditions.checkNotNull(lockConfig, "lockConfig");
@@ -182,6 +185,7 @@ public class TaskQueue
     );
     this.passwordRedactingMapper = mapper.copy()
                                          .addMixIn(PasswordProvider.class, 
PasswordProviderRedactionMixIn.class);
+    this.taskContextEnricher = Preconditions.checkNotNull(taskContextEnricher, 
"taskContextEnricher");
   }
 
   @VisibleForTesting
@@ -512,6 +516,8 @@ public class TaskQueue
         
SinglePhaseParallelIndexTaskRunner.DEFAULT_USE_LINEAGE_BASED_SEGMENT_ALLOCATION
     );
 
+    taskContextEnricher.enrichContext(task);
+
     giant.lock();
 
     try {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index e5a1958a1fa..12ed2348310 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -54,6 +54,7 @@ import 
org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
 import org.apache.druid.indexing.common.IngestionStatsAndErrors;
 import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
 import org.apache.druid.indexing.common.LockGranularity;
+import org.apache.druid.indexing.common.TaskContextReport;
 import org.apache.druid.indexing.common.TaskLock;
 import org.apache.druid.indexing.common.TaskLockType;
 import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
@@ -1134,7 +1135,8 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
                 null,
                 null
             )
-        )
+        ),
+        new TaskContextReport(task.getId(), task.getContext())
     );
   }
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java
index b0fced552b3..404b1cde1f5 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java
@@ -30,6 +30,7 @@ import 
org.apache.druid.indexing.common.IngestionStatsAndErrors;
 import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
 import org.apache.druid.indexing.common.KillTaskReport;
 import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
+import org.apache.druid.indexing.common.TaskContextReport;
 import org.apache.druid.indexing.common.TaskReport;
 import org.apache.druid.indexing.common.TestUtils;
 import org.junit.Assert;
@@ -81,6 +82,22 @@ public class TaskReportSerdeTest
     Assert.assertEquals(originalReport, deserializedReport);
   }
 
+  @Test
+  public void testSerdeOfTaskContextReport() throws Exception
+  {
+    TaskContextReport originalReport = new TaskContextReport(
+        "taskId",
+        ImmutableMap.of("key1", "value1", "key2", "value2")
+    );
+    String reportJson = jsonMapper.writeValueAsString(originalReport);
+    TaskReport deserialized = jsonMapper.readValue(reportJson, 
TaskReport.class);
+
+    Assert.assertTrue(deserialized instanceof TaskContextReport);
+
+    TaskContextReport deserializedReport = (TaskContextReport) deserialized;
+    Assert.assertEquals(originalReport, deserializedReport);
+  }
+
   @Test
   public void testWriteReportMapToFileAndRead() throws Exception
   {
@@ -145,7 +162,6 @@ public class TaskReportSerdeTest
         )
     );
 
-
     Assert.assertEquals(expected, jsonMapper.readValue(
         json,
         new TypeReference<TaskReport>()
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
index 864d84dbfa1..cc498c797bc 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
@@ -35,6 +35,7 @@ import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.config.TaskConfigBuilder;
 import org.apache.druid.indexing.common.task.IngestionTestBase;
 import org.apache.druid.indexing.common.task.NoopTask;
+import org.apache.druid.indexing.common.task.NoopTaskContextEnricher;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
 import org.apache.druid.indexing.overlord.Segments;
@@ -143,7 +144,8 @@ public class ConcurrentReplaceAndAppendTest extends 
IngestionTestBase
         taskActionClientFactory,
         getLockbox(),
         new NoopServiceEmitter(),
-        getObjectMapper()
+        getObjectMapper(),
+        new NoopTaskContextEnricher()
     );
     runningTasks.clear();
     taskQueue.start();
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
index b60bc10a8de..0046645106c 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
@@ -81,6 +81,7 @@ import 
org.apache.druid.indexing.common.task.IndexTask.IndexIngestionSpec;
 import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
 import org.apache.druid.indexing.common.task.KillUnusedSegmentsTask;
 import org.apache.druid.indexing.common.task.NoopTask;
+import org.apache.druid.indexing.common.task.NoopTaskContextEnricher;
 import org.apache.druid.indexing.common.task.NoopTestTaskReportFileWriter;
 import org.apache.druid.indexing.common.task.RealtimeIndexTask;
 import org.apache.druid.indexing.common.task.Task;
@@ -696,7 +697,18 @@ public class TaskLifecycleTest extends 
InitializedNullHandlingTest
         TaskQueueConfig.class
     );
 
-    return new TaskQueue(lockConfig, tqc, new DefaultTaskConfig(), ts, tr, 
tac, taskLockbox, emitter, mapper);
+    return new TaskQueue(
+        lockConfig,
+        tqc,
+        new DefaultTaskConfig(),
+        ts,
+        tr,
+        tac,
+        taskLockbox,
+        emitter,
+        mapper,
+        new NoopTaskContextEnricher()
+    );
   }
 
   @After
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockConfigTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockConfigTest.java
index 1c90802f480..9d408365daa 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockConfigTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockConfigTest.java
@@ -24,6 +24,7 @@ import 
org.apache.druid.indexing.common.actions.LocalTaskActionClientFactory;
 import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
 import org.apache.druid.indexing.common.config.TaskStorageConfig;
 import org.apache.druid.indexing.common.task.NoopTask;
+import org.apache.druid.indexing.common.task.NoopTaskContextEnricher;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.common.task.Tasks;
 import org.apache.druid.indexing.overlord.config.DefaultTaskConfig;
@@ -122,7 +123,8 @@ public class TaskLockConfigTest
         actionClientFactory,
         lockbox,
         emitter,
-        new DefaultObjectMapper()
+        new DefaultObjectMapper(),
+        new NoopTaskContextEnricher()
     );
   }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java
index 1d478d9cd14..1caf74c6681 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java
@@ -33,6 +33,7 @@ import 
org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
 import org.apache.druid.indexing.common.config.TaskStorageConfig;
 import org.apache.druid.indexing.common.task.NoopTask;
+import org.apache.druid.indexing.common.task.NoopTaskContextEnricher;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
 import org.apache.druid.indexing.overlord.config.DefaultTaskConfig;
@@ -123,7 +124,8 @@ public class TaskQueueScaleTest
         unsupportedTaskActionFactory, // Not used for anything serious
         new TaskLockbox(taskStorage, storageCoordinator),
         new NoopServiceEmitter(),
-        jsonMapper
+        jsonMapper,
+        new NoopTaskContextEnricher()
     );
 
     taskQueue.start();
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
index 969a08abd03..d34cb62d57c 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
@@ -50,6 +50,7 @@ import 
org.apache.druid.indexing.common.actions.TaskActionClientFactory;
 import org.apache.druid.indexing.common.config.TaskStorageConfig;
 import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
 import org.apache.druid.indexing.common.task.IngestionTestBase;
+import org.apache.druid.indexing.common.task.NoopTaskContextEnricher;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.common.task.Tasks;
 import 
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig;
@@ -132,7 +133,8 @@ public class TaskQueueTest extends IngestionTestBase
         actionClientFactory,
         getLockbox(),
         serviceEmitter,
-        getObjectMapper()
+        getObjectMapper(),
+        new NoopTaskContextEnricher()
     );
     taskQueue.setActive();
   }
@@ -340,7 +342,8 @@ public class TaskQueueTest extends IngestionTestBase
         actionClientFactory,
         getLockbox(),
         serviceEmitter,
-        getObjectMapper()
+        getObjectMapper(),
+        new NoopTaskContextEnricher()
     );
     taskQueue.setActive();
     final Task task = new TestTask("t1", Intervals.of("2021-01-01/P1D"));
@@ -427,7 +430,8 @@ public class TaskQueueTest extends IngestionTestBase
         actionClientFactory,
         getLockbox(),
         serviceEmitter,
-        getObjectMapper()
+        getObjectMapper(),
+        new NoopTaskContextEnricher()
     );
     taskQueue.setActive();
 
@@ -471,7 +475,8 @@ public class TaskQueueTest extends IngestionTestBase
         createActionClientFactory(),
         new TaskLockbox(taskStorage, new 
TestIndexerMetadataStorageCoordinator()),
         new StubServiceEmitter("druid/overlord", "testHost"),
-        mapper
+        mapper,
+        new NoopTaskContextEnricher()
     );
 
     final DataSchema dataSchema = new DataSchema(
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
index 066e4c6c58a..862d3aace3c 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
@@ -45,6 +45,7 @@ import 
org.apache.druid.indexing.common.actions.SegmentAllocationQueue;
 import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
 import org.apache.druid.indexing.common.config.TaskStorageConfig;
 import org.apache.druid.indexing.common.task.NoopTask;
+import org.apache.druid.indexing.common.task.NoopTaskContextEnricher;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage;
 import org.apache.druid.indexing.overlord.IndexerMetadataStorageAdapter;
@@ -249,7 +250,8 @@ public class OverlordTest
         EasyMock.createNiceMock(OverlordDutyExecutor.class),
         new TestDruidLeaderSelector(),
         EasyMock.createNiceMock(SegmentAllocationQueue.class),
-        new DefaultObjectMapper()
+        new DefaultObjectMapper(),
+        new NoopTaskContextEnricher()
     );
     EmittingLogger.registerEmitter(serviceEmitter);
   }
diff --git a/integration-tests-ex/cases/cluster/Common/dependencies.yaml 
b/integration-tests-ex/cases/cluster/Common/dependencies.yaml
index d43d92b0db2..10b01c46896 100644
--- a/integration-tests-ex/cases/cluster/Common/dependencies.yaml
+++ b/integration-tests-ex/cases/cluster/Common/dependencies.yaml
@@ -25,6 +25,8 @@ services:
   # Uses the official Zookeeper image
   # See https://hub.docker.com/_/zookeeper
   zookeeper:
+    # Uncomment the following when running on Apple Silicon processors:
+    # platform: linux/x86_64
     image: zookeeper:${ZK_VERSION}
     container_name: zookeeper
     labels:
@@ -71,7 +73,7 @@ services:
   # See https://hub.docker.com/_/mysql
   # The image will intialize the user and DB upon first start.
   metadata:
-    # Uncomment the following when running on M1 Macs:
+    # Uncomment the following when running on Apple Silicon processors:
     # platform: linux/x86_64
     image: mysql:$MYSQL_IMAGE_VERSION
     container_name: metadata
diff --git 
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractITBatchIndexTest.java
 
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractITBatchIndexTest.java
index 8eadbdf8f11..8a2891c162d 100644
--- 
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractITBatchIndexTest.java
+++ 
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractITBatchIndexTest.java
@@ -25,6 +25,7 @@ import org.apache.commons.io.IOUtils;
 import org.apache.druid.indexer.partitions.SecondaryPartitionType;
 import org.apache.druid.indexing.common.IngestionStatsAndErrors;
 import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
+import org.apache.druid.indexing.common.TaskContextReport;
 import org.apache.druid.indexing.common.TaskReport;
 import 
org.apache.druid.indexing.common.task.batch.parallel.PartialDimensionCardinalityTask;
 import 
org.apache.druid.indexing.common.task.batch.parallel.PartialDimensionDistributionTask;
@@ -499,6 +500,10 @@ public abstract class AbstractITBatchIndexTest extends 
AbstractIndexerTest
             segmentAvailabilityConfirmationPair.rhs
         );
       }
+
+      TaskContextReport taskContextReport = (TaskContextReport) 
indexer.getTaskReport(taskID).get(TaskContextReport.REPORT_KEY);
+
+      Assert.assertFalse(taskContextReport.getPayload().isEmpty());
     }
 
     // IT*ParallelIndexTest do a second round of ingestion to replace segments 
in an existing
diff --git 
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQuery.java
 
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQuery.java
index 38681c3d63a..d110eb5a4a9 100644
--- 
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQuery.java
+++ 
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQuery.java
@@ -22,6 +22,8 @@ package org.apache.druid.testsEx.msq;
 import com.google.api.client.util.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.inject.Inject;
+import org.apache.druid.indexing.common.TaskContextReport;
+import org.apache.druid.indexing.common.TaskReport;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.guava.Yielder;
@@ -234,11 +236,15 @@ public class ITMultiStageQuery
 
     msqHelper.pollTaskIdForSuccess(resultTaskStatus.getTaskId());
 
-    Map<String, MSQTaskReport> statusReport = 
msqHelper.fetchStatusReports(resultTaskStatus.getTaskId());
-    MSQTaskReport taskReport = statusReport.get(MSQTaskReport.REPORT_KEY);
+    Map<String, TaskReport> statusReport = 
msqHelper.fetchStatusReports(resultTaskStatus.getTaskId());
+
+    MSQTaskReport taskReport = (MSQTaskReport) 
statusReport.get(MSQTaskReport.REPORT_KEY);
     if (taskReport == null) {
       throw new ISE("Unable to fetch the status report for the task [%]", 
resultTaskStatus.getTaskId());
     }
+    TaskContextReport taskContextReport = (TaskContextReport) 
statusReport.get(TaskContextReport.REPORT_KEY);
+    Assert.assertFalse(taskContextReport.getPayload().isEmpty());
+
     MSQTaskReportPayload taskReportPayload = Preconditions.checkNotNull(
         taskReport.getPayload(),
         "payload"
diff --git 
a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
 
b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
index ceaeddea0dc..a0c364ced24 100644
--- 
a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
+++ 
b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
@@ -27,6 +27,7 @@ import org.apache.druid.client.indexing.TaskStatusResponse;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatusPlus;
 import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
+import org.apache.druid.indexing.common.TaskReport;
 import org.apache.druid.indexing.overlord.http.TaskPayloadResponse;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
 import org.apache.druid.java.util.common.ISE;
@@ -262,15 +263,16 @@ public class OverlordResourceTestClient
 
   public String getTaskErrorMessage(String taskId)
   {
-    return 
getTaskReport(taskId).get(IngestionStatsAndErrorsTaskReport.REPORT_KEY)
-                                .getPayload().getErrorMsg();
+    return ((IngestionStatsAndErrorsTaskReport) 
getTaskReport(taskId).get(IngestionStatsAndErrorsTaskReport.REPORT_KEY))
+        .getPayload().getErrorMsg();
   }
 
   public RowIngestionMetersTotals getTaskStats(String taskId)
   {
     try {
-      Object buildSegment = 
getTaskReport(taskId).get(IngestionStatsAndErrorsTaskReport.REPORT_KEY)
-                                                 
.getPayload().getRowStats().get("buildSegments");
+      Object buildSegment = ((IngestionStatsAndErrorsTaskReport) 
getTaskReport(taskId).get(
+          IngestionStatsAndErrorsTaskReport.REPORT_KEY))
+          .getPayload().getRowStats().get("buildSegments");
       return jsonMapper.convertValue(buildSegment, 
RowIngestionMetersTotals.class);
     }
     catch (Exception e) {
@@ -278,7 +280,7 @@ public class OverlordResourceTestClient
     }
   }
 
-  public Map<String, IngestionStatsAndErrorsTaskReport> getTaskReport(String 
taskId)
+  public Map<String, TaskReport> getTaskReport(String taskId)
   {
     try {
       StatusResponseHolder response = makeRequest(
@@ -291,7 +293,7 @@ public class OverlordResourceTestClient
       );
       return jsonMapper.readValue(
           response.getContent(),
-          new TypeReference<Map<String, IngestionStatsAndErrorsTaskReport>>()
+          new TypeReference<Map<String, TaskReport>>()
           {
           }
       );
diff --git 
a/integration-tests/src/main/java/org/apache/druid/testing/clients/msq/MsqOverlordResourceTestClient.java
 
b/integration-tests/src/main/java/org/apache/druid/testing/clients/msq/MsqOverlordResourceTestClient.java
index c9291c45320..f647a75e33a 100644
--- 
a/integration-tests/src/main/java/org/apache/druid/testing/clients/msq/MsqOverlordResourceTestClient.java
+++ 
b/integration-tests/src/main/java/org/apache/druid/testing/clients/msq/MsqOverlordResourceTestClient.java
@@ -23,11 +23,11 @@ import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.inject.Inject;
 import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.indexing.common.TaskReport;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.http.client.HttpClient;
 import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
 import org.apache.druid.msq.guice.MSQIndexingModule;
-import org.apache.druid.msq.indexing.report.MSQTaskReport;
 import org.apache.druid.testing.IntegrationTestingConfig;
 import org.apache.druid.testing.clients.OverlordResourceTestClient;
 import org.apache.druid.testing.guice.TestClient;
@@ -54,7 +54,7 @@ public class MsqOverlordResourceTestClient extends 
OverlordResourceTestClient
     this.jsonMapper.registerModules(new 
MSQIndexingModule().getJacksonModules());
   }
 
-  public Map<String, MSQTaskReport> getMsqTaskReport(String taskId)
+  public Map<String, TaskReport> getMsqTaskReport(String taskId)
   {
     try {
       StatusResponseHolder response = makeRequest(
@@ -65,7 +65,7 @@ public class MsqOverlordResourceTestClient extends 
OverlordResourceTestClient
               StringUtils.format("task/%s/reports", 
StringUtils.urlEncode(taskId))
           )
       );
-      return jsonMapper.readValue(response.getContent(), new 
TypeReference<Map<String, MSQTaskReport>>()
+      return jsonMapper.readValue(response.getContent(), new 
TypeReference<Map<String, TaskReport>>()
       {
       });
     }
diff --git 
a/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java
 
b/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java
index 3d20e31f566..e98793b3c8c 100644
--- 
a/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java
+++ 
b/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java
@@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableMap;
 import com.google.inject.Inject;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.indexing.common.TaskReport;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.RetryUtils;
@@ -185,7 +186,7 @@ public class MsqTestQueryHelper extends 
AbstractTestQueryHelper<MsqQueryWithResu
   /**
    * Fetches status reports for a given task
    */
-  public Map<String, MSQTaskReport> fetchStatusReports(String taskId)
+  public Map<String, TaskReport> fetchStatusReports(String taskId)
   {
     return overlordClient.getMsqTaskReport(taskId);
   }
@@ -195,8 +196,8 @@ public class MsqTestQueryHelper extends 
AbstractTestQueryHelper<MsqQueryWithResu
    */
   private void compareResults(String taskId, MsqQueryWithResults 
expectedQueryWithResults)
   {
-    Map<String, MSQTaskReport> statusReport = fetchStatusReports(taskId);
-    MSQTaskReport taskReport = statusReport.get(MSQTaskReport.REPORT_KEY);
+    Map<String, TaskReport> statusReport = fetchStatusReports(taskId);
+    MSQTaskReport taskReport = (MSQTaskReport) 
statusReport.get(MSQTaskReport.REPORT_KEY);
     if (taskReport == null) {
       throw new ISE("Unable to fetch the status report for the task [%]", 
taskId);
     }
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
index d419bbb48d3..f4a2cdca4b5 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
@@ -25,6 +25,7 @@ import org.apache.commons.io.IOUtils;
 import org.apache.druid.indexer.partitions.SecondaryPartitionType;
 import org.apache.druid.indexing.common.IngestionStatsAndErrors;
 import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
+import org.apache.druid.indexing.common.TaskContextReport;
 import org.apache.druid.indexing.common.TaskReport;
 import 
org.apache.druid.indexing.common.task.batch.parallel.PartialDimensionCardinalityTask;
 import 
org.apache.druid.indexing.common.task.batch.parallel.PartialDimensionDistributionTask;
@@ -382,6 +383,11 @@ public abstract class AbstractITBatchIndexTest extends 
AbstractIndexerTest
             segmentAvailabilityConfirmationPair.rhs
         );
       }
+
+      TaskContextReport taskContextReport =
+          (TaskContextReport) 
indexer.getTaskReport(taskID).get(TaskContextReport.REPORT_KEY);
+
+      Assert.assertFalse(taskContextReport.getPayload().isEmpty());
     }
 
     // IT*ParallelIndexTest do a second round of ingestion to replace 
segements in an existing
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
index 2b37c7a27d2..9bfc2ad02e5 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
@@ -23,6 +23,7 @@ import com.google.inject.Inject;
 import org.apache.commons.io.IOUtils;
 import org.apache.druid.indexing.common.IngestionStatsAndErrors;
 import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
+import org.apache.druid.indexing.common.TaskReport;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.GranularityType;
@@ -170,20 +171,24 @@ public class ITCompactionTaskTest extends 
AbstractIndexerTest
 
       checkCompactionIntervals(expectedIntervalAfterCompaction);
 
-      Map<String, IngestionStatsAndErrorsTaskReport> reports = 
indexer.getTaskReport(taskId);
+      Map<String, TaskReport> reports = indexer.getTaskReport(taskId);
       Assert.assertTrue(reports != null && reports.size() > 0);
 
-      Assert.assertEquals(2,
-                          reports.values()
-                                 .stream()
-                                 .mapToLong(r -> ((IngestionStatsAndErrors) 
r.getPayload()).getSegmentsPublished())
-                                 .sum()
+      Assert.assertEquals(
+          2,
+          reports.values()
+                 .stream()
+                 .filter(r -> r instanceof IngestionStatsAndErrorsTaskReport)
+                 .mapToLong(r -> ((IngestionStatsAndErrors) 
r.getPayload()).getSegmentsPublished())
+                 .sum()
       );
-      Assert.assertEquals(4,
-                          reports.values()
-                                 .stream()
-                                 .mapToLong(r -> ((IngestionStatsAndErrors) 
r.getPayload()).getSegmentsRead())
-                                 .sum()
+      Assert.assertEquals(
+          4,
+          reports.values()
+                 .stream()
+                 .filter(r -> r instanceof IngestionStatsAndErrorsTaskReport)
+                 .mapToLong(r -> ((IngestionStatsAndErrors) 
r.getPayload()).getSegmentsRead())
+                 .sum()
       );
     }
   }
@@ -268,7 +273,7 @@ public class ITCompactionTaskTest extends 
AbstractIndexerTest
       }
       checkCompactionIntervals(expectedIntervalAfterCompaction);
 
-      Map<String, IngestionStatsAndErrorsTaskReport> reports = 
indexer.getTaskReport(taskId);
+      Map<String, TaskReport> reports = indexer.getTaskReport(taskId);
       Assert.assertTrue(reports != null && reports.size() > 0);
     }
   }
diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java 
b/services/src/main/java/org/apache/druid/cli/CliOverlord.java
index f48c9be01b6..771e76561ad 100644
--- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java
+++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java
@@ -62,6 +62,8 @@ import 
org.apache.druid.indexing.common.actions.TaskAuditLogConfig;
 import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.config.TaskStorageConfig;
 import 
org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
+import org.apache.druid.indexing.common.task.NoopTaskContextEnricher;
+import org.apache.druid.indexing.common.task.TaskContextEnricher;
 import 
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClientProvider;
 import org.apache.druid.indexing.common.task.batch.parallel.ShuffleClient;
 import org.apache.druid.indexing.common.tasklogs.SwitchingTaskLogStreamer;
@@ -238,6 +240,18 @@ public class CliOverlord extends ServerRunnable
                 .in(LazySingleton.class);
             
binder.bind(DropwizardRowIngestionMetersFactory.class).in(LazySingleton.class);
 
+            PolyBind.optionBinder(binder, Key.get(TaskContextEnricher.class))
+                    .addBinding(NoopTaskContextEnricher.TYPE)
+                    .to(NoopTaskContextEnricher.class)
+                    .in(LazySingleton.class);
+
+            PolyBind.createChoiceWithDefault(
+                binder,
+                "druid.indexer.task.contextenricher.type",
+                Key.get(TaskContextEnricher.class),
+                NoopTaskContextEnricher.TYPE
+            );
+
             configureTaskStorage(binder);
             configureIntermediaryData(binder);
             configureAutoscale(binder);
diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java 
b/services/src/main/java/org/apache/druid/cli/CliPeon.java
index 1283809fccb..92699694dda 100644
--- a/services/src/main/java/org/apache/druid/cli/CliPeon.java
+++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java
@@ -308,13 +308,18 @@ public class CliPeon extends GuiceRunnable
           @Named(ServiceStatusMonitor.HEARTBEAT_TAGS_BINDING)
           public Supplier<Map<String, Object>> heartbeatDimensions(Task task)
           {
+            ImmutableMap.Builder<String, Object> builder = 
ImmutableMap.builder();
+            builder.put(DruidMetrics.TASK_ID, task.getId());
+            builder.put(DruidMetrics.DATASOURCE, task.getDataSource());
+            builder.put(DruidMetrics.TASK_TYPE, task.getType());
+            builder.put(DruidMetrics.GROUP_ID, task.getGroupId());
+            Map<String, Object> tags = task.getContextValue(DruidMetrics.TAGS);
+            if (tags != null && !tags.isEmpty()) {
+              builder.put(DruidMetrics.TAGS, tags);
+            }
+
             return Suppliers.ofInstance(
-                ImmutableMap.of(
-                    DruidMetrics.TASK_ID, task.getId(),
-                    DruidMetrics.DATASOURCE, task.getDataSource(),
-                    DruidMetrics.TASK_TYPE, task.getType(),
-                    DruidMetrics.GROUP_ID, task.getGroupId()
-                )
+                builder.build()
             );
           }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to