This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new da9feb44309 Introduce TaskContextReport for reporting task context
(#16041)
da9feb44309 is described below
commit da9feb4430929e79ac4d9c4003e11120913a3f09
Author: YongGang <[email protected]>
AuthorDate: Thu Apr 11 20:27:49 2024 -0700
Introduce TaskContextReport for reporting task context (#16041)
Changes:
- Add `TaskContextEnricher` interface to improve task management and
monitoring
- Invoke `enrichContext` in `TaskQueue.add()` whenever a new task is
submitted to the Overlord
- Add `TaskContextReport` to write out task context information in reports
---
docs/ingestion/tasks.md | 8 +++
docs/operations/metrics.md | 2 +-
.../org/apache/druid/msq/exec/ControllerImpl.java | 15 +++-
.../druid/msq/util/MultiStageQueryContext.java | 7 ++
.../druid/indexing/common/TaskContextReport.java | 84 ++++++++++++++++++++++
.../apache/druid/indexing/common/TaskReport.java | 3 +-
.../common/task/AbstractBatchIndexTask.java | 7 +-
.../task/AppenderatorDriverRealtimeIndexTask.java | 4 +-
.../common/task/NoopTaskContextEnricher.java | 30 ++++++++
.../indexing/common/task/TaskContextEnricher.java | 45 ++++++++++++
.../apache/druid/indexing/overlord/TaskMaster.java | 7 +-
.../apache/druid/indexing/overlord/TaskQueue.java | 8 ++-
.../SeekableStreamIndexTaskRunner.java | 4 +-
.../indexing/common/task/TaskReportSerdeTest.java | 18 ++++-
.../concurrent/ConcurrentReplaceAndAppendTest.java | 4 +-
.../druid/indexing/overlord/TaskLifecycleTest.java | 14 +++-
.../indexing/overlord/TaskLockConfigTest.java | 4 +-
.../indexing/overlord/TaskQueueScaleTest.java | 4 +-
.../druid/indexing/overlord/TaskQueueTest.java | 13 ++--
.../druid/indexing/overlord/http/OverlordTest.java | 4 +-
.../cases/cluster/Common/dependencies.yaml | 4 +-
.../testsEx/indexer/AbstractITBatchIndexTest.java | 5 ++
.../druid/testsEx/msq/ITMultiStageQuery.java | 10 ++-
.../clients/OverlordResourceTestClient.java | 14 ++--
.../clients/msq/MsqOverlordResourceTestClient.java | 6 +-
.../druid/testing/utils/MsqTestQueryHelper.java | 7 +-
.../tests/indexer/AbstractITBatchIndexTest.java | 6 ++
.../druid/tests/indexer/ITCompactionTaskTest.java | 29 ++++----
.../java/org/apache/druid/cli/CliOverlord.java | 14 ++++
.../main/java/org/apache/druid/cli/CliPeon.java | 17 +++--
30 files changed, 344 insertions(+), 53 deletions(-)
diff --git a/docs/ingestion/tasks.md b/docs/ingestion/tasks.md
index ab206c75762..aaed12d7f4d 100644
--- a/docs/ingestion/tasks.md
+++ b/docs/ingestion/tasks.md
@@ -89,6 +89,14 @@ An example output is shown below:
"errorMsg": null
},
"type": "ingestionStatsAndErrors"
+ },
+ "taskContext": {
+ "type": "taskContext",
+ "taskId": "compact_twitter_2018-09-24T18:24:23.920Z",
+ "payload": {
+ "forceTimeChunkLock": true,
+ "useLineageBasedSegmentAllocation": true
+ }
}
}
```
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index bf9d63a93b8..dde1c7f64f8 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -378,7 +378,7 @@ These metrics are for the Druid Coordinator and are reset
each time the Coordina
|Metric|Description|Dimensions|Normal value|
|------|-----------|----------|------------|
-| `service/heartbeat` | Metric indicating the service is up.
`ServiceStatusMonitor` must be enabled. | `leader` on the Overlord and
Coordinator.<br />`workerVersion`, `category`, `status` on the Middle
Manager.<br />`taskId`, `groupId`, `taskType`, `dataSource` on the Peon |1|
+| `service/heartbeat` | Metric indicating the service is up. This metric is
emitted only when `ServiceStatusMonitor` is enabled. | `leader` on the Overlord
and Coordinator.<br />`workerVersion`, `category`, `status` on the Middle
Manager.<br />`taskId`, `groupId`, `taskType`, `dataSource`, `tags` on the Peon
|1|
### Historical
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index e7759bc47b4..63aaf7b4de8 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -69,6 +69,7 @@ import
org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.common.LockGranularity;
+import org.apache.druid.indexing.common.TaskContextReport;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskReport;
@@ -190,6 +191,7 @@ import org.apache.druid.msq.util.MSQFutureUtils;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.msq.util.PassthroughAggregatorFactory;
import org.apache.druid.msq.util.SqlStatementResourceHelper;
+import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.aggregation.AggregatorFactory;
@@ -589,7 +591,10 @@ public class ControllerImpl implements Controller
);
context.writeReports(
id(),
- TaskReport.buildTaskReports(new MSQTaskReport(id(),
taskReportPayload))
+ TaskReport.buildTaskReports(
+ new MSQTaskReport(id(), taskReportPayload),
+ new TaskContextReport(id(), task.getContext())
+ )
);
}
catch (Throwable e) {
@@ -713,6 +718,14 @@ public class ControllerImpl implements Controller
MSQControllerTask.isReplaceInputDataSourceTask(task)
);
+ // propagate the controller's context and tags to the worker task
+ taskContextOverridesBuilder.put(MultiStageQueryContext.CTX_OF_CONTROLLER,
task.getContext());
+ // specifically assign the 'tags' field for enhanced worker task metrics
reporting
+ Map<String, Object> tags = task.getContextValue(DruidMetrics.TAGS);
+ if (tags != null) {
+ taskContextOverridesBuilder.put(DruidMetrics.TAGS, tags);
+ }
+
this.workerTaskLauncher = new MSQWorkerTaskLauncher(
id(),
task.getDataSource(),
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
index 60734b5b1da..0fd8e4f51f3 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
@@ -143,6 +143,13 @@ public class MultiStageQueryContext
public static final String CTX_IS_REINDEX = "isReindex";
+ /**
+ * Key for controller task's context passed to worker tasks.
+ * Facilitates sharing the controller's execution environment
+ * and configurations with its associated worker tasks.
+ */
+ public static final String CTX_OF_CONTROLLER = "controllerCtx";
+
/**
* Controls sort order within segments. Normally, this is the same as the
overall order of the query (from the
* CLUSTERED BY clause) but it can be overridden.
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskContextReport.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskContextReport.java
new file mode 100644
index 00000000000..743f216908d
--- /dev/null
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskContextReport.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+import java.util.Objects;
+
+public class TaskContextReport implements TaskReport
+{
+ public static final String REPORT_KEY = "taskContext";
+ private final String taskId;
+ @Nullable
+ private final Map<String, Object> taskContext;
+
+ @JsonCreator
+ public TaskContextReport(
+ @JsonProperty("taskId") final String taskId,
+ @JsonProperty("payload") @Nullable final Map<String, Object> taskContext
+ )
+ {
+ this.taskId = taskId;
+ this.taskContext = taskContext;
+ }
+
+ @Override
+ @JsonProperty
+ public String getTaskId()
+ {
+ return taskId;
+ }
+
+ @Override
+ public String getReportKey()
+ {
+ return REPORT_KEY;
+ }
+
+ @Override
+ @JsonProperty
+ public Map<String, Object> getPayload()
+ {
+ return taskContext;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TaskContextReport that = (TaskContextReport) o;
+ return Objects.equals(taskId, that.taskId) && Objects.equals(taskContext,
that.taskContext);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(taskId, taskContext);
+ }
+}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskReport.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskReport.java
index 86ed562112f..adada4f708d 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskReport.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskReport.java
@@ -35,7 +35,8 @@ import java.util.LinkedHashMap;
name = IngestionStatsAndErrorsTaskReport.REPORT_KEY,
value = IngestionStatsAndErrorsTaskReport.class
),
- @JsonSubTypes.Type(name = KillTaskReport.REPORT_KEY, value =
KillTaskReport.class)
+ @JsonSubTypes.Type(name = KillTaskReport.REPORT_KEY, value =
KillTaskReport.class),
+ @JsonSubTypes.Type(name = TaskContextReport.REPORT_KEY, value =
TaskContextReport.class)
})
public interface TaskReport
{
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
index 7c3608cd8d5..4486f837376 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
@@ -34,6 +34,7 @@ import org.apache.druid.indexer.IngestionState;
import org.apache.druid.indexing.common.IngestionStatsAndErrors;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexing.common.LockGranularity;
+import org.apache.druid.indexing.common.TaskContextReport;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskReport;
@@ -919,7 +920,8 @@ public abstract class AbstractBatchIndexTask extends
AbstractTask
null,
null
)
- )
+ ),
+ new TaskContextReport(getId(), getContext())
);
}
@@ -948,7 +950,8 @@ public abstract class AbstractBatchIndexTask extends
AbstractTask
segmentsRead,
segmentsPublished
)
- )
+ ),
+ new TaskContextReport(getId(), getContext())
);
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
index 2e4710d43cd..19f8a775b27 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
@@ -45,6 +45,7 @@ import
org.apache.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
import org.apache.druid.indexing.common.IngestionStatsAndErrors;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexing.common.LockGranularity;
+import org.apache.druid.indexing.common.TaskContextReport;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
@@ -625,7 +626,8 @@ public class AppenderatorDriverRealtimeIndexTask extends
AbstractTask implements
null,
null
)
- )
+ ),
+ new TaskContextReport(getId(), getContext())
);
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTaskContextEnricher.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTaskContextEnricher.java
new file mode 100644
index 00000000000..3ec25f99514
--- /dev/null
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTaskContextEnricher.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+public class NoopTaskContextEnricher implements TaskContextEnricher
+{
+ public static final String TYPE = "noop";
+
+ @Override
+ public void enrichContext(Task task)
+ {
+ }
+}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskContextEnricher.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskContextEnricher.java
new file mode 100644
index 00000000000..baa6141e6ae
--- /dev/null
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskContextEnricher.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import org.apache.druid.guice.annotations.UnstableApi;
+
+/**
+ * The TaskContextEnricher interface enhances Druid tasks by appending
contextual information.
+ * By infusing tasks with additional context, it aims to improve aspects of
task management,
+ * monitoring, and analysis. This contextual information aids in clarifying
the intent and
+ * specifics of tasks within metrics and reporting systems.
+ */
+@UnstableApi
+public interface TaskContextEnricher
+{
+ /**
+ * Augments a task's context with additional information. This method
introduces or updates
+ * context entries to better describe the task. Such enriched context is
pivotal for generating
+ * detailed task reports and for incorporating as dimensions within metrics
reporting. It ensures
+ * tasks are more accurately represented and managed by providing deeper
insights into task execution
+ * and performance.
+ *
+ * @param task The Druid task to be augmented with additional context. This
process may either
+ * supplement existing context entries or introduce new ones,
thereby refining the
+ * task's narrative and operational details.
+ */
+ void enrichContext(Task task);
+}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
index 9829e1ffa19..5103f9bd87e 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
@@ -31,6 +31,7 @@ import
org.apache.druid.indexing.common.actions.SegmentAllocationQueue;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.common.task.TaskContextEnricher;
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
import org.apache.druid.indexing.overlord.config.DefaultTaskConfig;
import org.apache.druid.indexing.overlord.config.TaskLockConfig;
@@ -96,7 +97,8 @@ public class TaskMaster implements TaskCountStatsProvider,
TaskSlotCountStatsPro
final OverlordDutyExecutor overlordDutyExecutor,
@IndexingService final DruidLeaderSelector overlordLeaderSelector,
final SegmentAllocationQueue segmentAllocationQueue,
- final ObjectMapper mapper
+ final ObjectMapper mapper,
+ final TaskContextEnricher taskContextEnricher
)
{
this.supervisorManager = supervisorManager;
@@ -128,7 +130,8 @@ public class TaskMaster implements TaskCountStatsProvider,
TaskSlotCountStatsPro
taskActionClientFactory,
taskLockbox,
emitter,
- mapper
+ mapper,
+ taskContextEnricher
);
// Sensible order to start stuff:
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
index fb3f68c2394..a143b93e9ef 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
@@ -43,6 +43,7 @@ import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.common.task.TaskContextEnricher;
import org.apache.druid.indexing.common.task.Tasks;
import
org.apache.druid.indexing.common.task.batch.MaxAllowedLocksExceededException;
import
org.apache.druid.indexing.common.task.batch.parallel.SinglePhaseParallelIndexTaskRunner;
@@ -121,6 +122,7 @@ public class TaskQueue
private final TaskLockbox taskLockbox;
private final ServiceEmitter emitter;
private final ObjectMapper passwordRedactingMapper;
+ private final TaskContextEnricher taskContextEnricher;
private final ReentrantLock giant = new ReentrantLock(true);
@SuppressWarnings("MismatchedQueryAndUpdateOfCollection")
@@ -165,7 +167,8 @@ public class TaskQueue
TaskActionClientFactory taskActionClientFactory,
TaskLockbox taskLockbox,
ServiceEmitter emitter,
- ObjectMapper mapper
+ ObjectMapper mapper,
+ TaskContextEnricher taskContextEnricher
)
{
this.lockConfig = Preconditions.checkNotNull(lockConfig, "lockConfig");
@@ -182,6 +185,7 @@ public class TaskQueue
);
this.passwordRedactingMapper = mapper.copy()
.addMixIn(PasswordProvider.class,
PasswordProviderRedactionMixIn.class);
+ this.taskContextEnricher = Preconditions.checkNotNull(taskContextEnricher,
"taskContextEnricher");
}
@VisibleForTesting
@@ -512,6 +516,8 @@ public class TaskQueue
SinglePhaseParallelIndexTaskRunner.DEFAULT_USE_LINEAGE_BASED_SEGMENT_ALLOCATION
);
+ taskContextEnricher.enrichContext(task);
+
giant.lock();
try {
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index e5a1958a1fa..12ed2348310 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -54,6 +54,7 @@ import
org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexing.common.IngestionStatsAndErrors;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexing.common.LockGranularity;
+import org.apache.druid.indexing.common.TaskContextReport;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
@@ -1134,7 +1135,8 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
null,
null
)
- )
+ ),
+ new TaskContextReport(task.getId(), task.getContext())
);
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java
index b0fced552b3..404b1cde1f5 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java
@@ -30,6 +30,7 @@ import
org.apache.druid.indexing.common.IngestionStatsAndErrors;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexing.common.KillTaskReport;
import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
+import org.apache.druid.indexing.common.TaskContextReport;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TestUtils;
import org.junit.Assert;
@@ -81,6 +82,22 @@ public class TaskReportSerdeTest
Assert.assertEquals(originalReport, deserializedReport);
}
+ @Test
+ public void testSerdeOfTaskContextReport() throws Exception
+ {
+ TaskContextReport originalReport = new TaskContextReport(
+ "taskId",
+ ImmutableMap.of("key1", "value1", "key2", "value2")
+ );
+ String reportJson = jsonMapper.writeValueAsString(originalReport);
+ TaskReport deserialized = jsonMapper.readValue(reportJson,
TaskReport.class);
+
+ Assert.assertTrue(deserialized instanceof TaskContextReport);
+
+ TaskContextReport deserializedReport = (TaskContextReport) deserialized;
+ Assert.assertEquals(originalReport, deserializedReport);
+ }
+
@Test
public void testWriteReportMapToFileAndRead() throws Exception
{
@@ -145,7 +162,6 @@ public class TaskReportSerdeTest
)
);
-
Assert.assertEquals(expected, jsonMapper.readValue(
json,
new TypeReference<TaskReport>()
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
index 864d84dbfa1..cc498c797bc 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
@@ -35,6 +35,7 @@ import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
import org.apache.druid.indexing.common.task.IngestionTestBase;
import org.apache.druid.indexing.common.task.NoopTask;
+import org.apache.druid.indexing.common.task.NoopTaskContextEnricher;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
import org.apache.druid.indexing.overlord.Segments;
@@ -143,7 +144,8 @@ public class ConcurrentReplaceAndAppendTest extends
IngestionTestBase
taskActionClientFactory,
getLockbox(),
new NoopServiceEmitter(),
- getObjectMapper()
+ getObjectMapper(),
+ new NoopTaskContextEnricher()
);
runningTasks.clear();
taskQueue.start();
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
index b60bc10a8de..0046645106c 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
@@ -81,6 +81,7 @@ import
org.apache.druid.indexing.common.task.IndexTask.IndexIngestionSpec;
import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
import org.apache.druid.indexing.common.task.KillUnusedSegmentsTask;
import org.apache.druid.indexing.common.task.NoopTask;
+import org.apache.druid.indexing.common.task.NoopTaskContextEnricher;
import org.apache.druid.indexing.common.task.NoopTestTaskReportFileWriter;
import org.apache.druid.indexing.common.task.RealtimeIndexTask;
import org.apache.druid.indexing.common.task.Task;
@@ -696,7 +697,18 @@ public class TaskLifecycleTest extends
InitializedNullHandlingTest
TaskQueueConfig.class
);
- return new TaskQueue(lockConfig, tqc, new DefaultTaskConfig(), ts, tr,
tac, taskLockbox, emitter, mapper);
+ return new TaskQueue(
+ lockConfig,
+ tqc,
+ new DefaultTaskConfig(),
+ ts,
+ tr,
+ tac,
+ taskLockbox,
+ emitter,
+ mapper,
+ new NoopTaskContextEnricher()
+ );
}
@After
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockConfigTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockConfigTest.java
index 1c90802f480..9d408365daa 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockConfigTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockConfigTest.java
@@ -24,6 +24,7 @@ import
org.apache.druid.indexing.common.actions.LocalTaskActionClientFactory;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.common.task.NoopTask;
+import org.apache.druid.indexing.common.task.NoopTaskContextEnricher;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.overlord.config.DefaultTaskConfig;
@@ -122,7 +123,8 @@ public class TaskLockConfigTest
actionClientFactory,
lockbox,
emitter,
- new DefaultObjectMapper()
+ new DefaultObjectMapper(),
+ new NoopTaskContextEnricher()
);
}
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java
index 1d478d9cd14..1caf74c6681 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java
@@ -33,6 +33,7 @@ import
org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.common.task.NoopTask;
+import org.apache.druid.indexing.common.task.NoopTaskContextEnricher;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
import org.apache.druid.indexing.overlord.config.DefaultTaskConfig;
@@ -123,7 +124,8 @@ public class TaskQueueScaleTest
unsupportedTaskActionFactory, // Not used for anything serious
new TaskLockbox(taskStorage, storageCoordinator),
new NoopServiceEmitter(),
- jsonMapper
+ jsonMapper,
+ new NoopTaskContextEnricher()
);
taskQueue.start();
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
index 969a08abd03..d34cb62d57c 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
@@ -50,6 +50,7 @@ import
org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
import org.apache.druid.indexing.common.task.IngestionTestBase;
+import org.apache.druid.indexing.common.task.NoopTaskContextEnricher;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.Tasks;
import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig;
@@ -132,7 +133,8 @@ public class TaskQueueTest extends IngestionTestBase
actionClientFactory,
getLockbox(),
serviceEmitter,
- getObjectMapper()
+ getObjectMapper(),
+ new NoopTaskContextEnricher()
);
taskQueue.setActive();
}
@@ -340,7 +342,8 @@ public class TaskQueueTest extends IngestionTestBase
actionClientFactory,
getLockbox(),
serviceEmitter,
- getObjectMapper()
+ getObjectMapper(),
+ new NoopTaskContextEnricher()
);
taskQueue.setActive();
final Task task = new TestTask("t1", Intervals.of("2021-01-01/P1D"));
@@ -427,7 +430,8 @@ public class TaskQueueTest extends IngestionTestBase
actionClientFactory,
getLockbox(),
serviceEmitter,
- getObjectMapper()
+ getObjectMapper(),
+ new NoopTaskContextEnricher()
);
taskQueue.setActive();
@@ -471,7 +475,8 @@ public class TaskQueueTest extends IngestionTestBase
createActionClientFactory(),
new TaskLockbox(taskStorage, new
TestIndexerMetadataStorageCoordinator()),
new StubServiceEmitter("druid/overlord", "testHost"),
- mapper
+ mapper,
+ new NoopTaskContextEnricher()
);
final DataSchema dataSchema = new DataSchema(
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
index 066e4c6c58a..862d3aace3c 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
@@ -45,6 +45,7 @@ import
org.apache.druid.indexing.common.actions.SegmentAllocationQueue;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.common.task.NoopTask;
+import org.apache.druid.indexing.common.task.NoopTaskContextEnricher;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageAdapter;
@@ -249,7 +250,8 @@ public class OverlordTest
EasyMock.createNiceMock(OverlordDutyExecutor.class),
new TestDruidLeaderSelector(),
EasyMock.createNiceMock(SegmentAllocationQueue.class),
- new DefaultObjectMapper()
+ new DefaultObjectMapper(),
+ new NoopTaskContextEnricher()
);
EmittingLogger.registerEmitter(serviceEmitter);
}
diff --git a/integration-tests-ex/cases/cluster/Common/dependencies.yaml
b/integration-tests-ex/cases/cluster/Common/dependencies.yaml
index d43d92b0db2..10b01c46896 100644
--- a/integration-tests-ex/cases/cluster/Common/dependencies.yaml
+++ b/integration-tests-ex/cases/cluster/Common/dependencies.yaml
@@ -25,6 +25,8 @@ services:
# Uses the official Zookeeper image
# See https://hub.docker.com/_/zookeeper
zookeeper:
+ # Uncomment the following when running on Apple Silicon processors:
+ # platform: linux/x86_64
image: zookeeper:${ZK_VERSION}
container_name: zookeeper
labels:
@@ -71,7 +73,7 @@ services:
# See https://hub.docker.com/_/mysql
# The image will intialize the user and DB upon first start.
metadata:
- # Uncomment the following when running on M1 Macs:
+ # Uncomment the following when running on Apple Silicon processors:
# platform: linux/x86_64
image: mysql:$MYSQL_IMAGE_VERSION
container_name: metadata
diff --git
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractITBatchIndexTest.java
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractITBatchIndexTest.java
index 8eadbdf8f11..8a2891c162d 100644
---
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractITBatchIndexTest.java
+++
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractITBatchIndexTest.java
@@ -25,6 +25,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.druid.indexer.partitions.SecondaryPartitionType;
import org.apache.druid.indexing.common.IngestionStatsAndErrors;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
+import org.apache.druid.indexing.common.TaskContextReport;
import org.apache.druid.indexing.common.TaskReport;
import
org.apache.druid.indexing.common.task.batch.parallel.PartialDimensionCardinalityTask;
import
org.apache.druid.indexing.common.task.batch.parallel.PartialDimensionDistributionTask;
@@ -499,6 +500,10 @@ public abstract class AbstractITBatchIndexTest extends
AbstractIndexerTest
segmentAvailabilityConfirmationPair.rhs
);
}
+
+ TaskContextReport taskContextReport = (TaskContextReport)
indexer.getTaskReport(taskID).get(TaskContextReport.REPORT_KEY);
+
+ Assert.assertFalse(taskContextReport.getPayload().isEmpty());
}
// IT*ParallelIndexTest do a second round of ingestion to replace segments
in an existing
diff --git
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQuery.java
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQuery.java
index 38681c3d63a..d110eb5a4a9 100644
---
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQuery.java
+++
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQuery.java
@@ -22,6 +22,8 @@ package org.apache.druid.testsEx.msq;
import com.google.api.client.util.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
+import org.apache.druid.indexing.common.TaskContextReport;
+import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.Yielder;
@@ -234,11 +236,15 @@ public class ITMultiStageQuery
msqHelper.pollTaskIdForSuccess(resultTaskStatus.getTaskId());
- Map<String, MSQTaskReport> statusReport =
msqHelper.fetchStatusReports(resultTaskStatus.getTaskId());
- MSQTaskReport taskReport = statusReport.get(MSQTaskReport.REPORT_KEY);
+ Map<String, TaskReport> statusReport =
msqHelper.fetchStatusReports(resultTaskStatus.getTaskId());
+
+ MSQTaskReport taskReport = (MSQTaskReport)
statusReport.get(MSQTaskReport.REPORT_KEY);
if (taskReport == null) {
throw new ISE("Unable to fetch the status report for the task [%]",
resultTaskStatus.getTaskId());
}
+ TaskContextReport taskContextReport = (TaskContextReport)
statusReport.get(TaskContextReport.REPORT_KEY);
+ Assert.assertFalse(taskContextReport.getPayload().isEmpty());
+
MSQTaskReportPayload taskReportPayload = Preconditions.checkNotNull(
taskReport.getPayload(),
"payload"
diff --git
a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
index ceaeddea0dc..a0c364ced24 100644
---
a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
+++
b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
@@ -27,6 +27,7 @@ import org.apache.druid.client.indexing.TaskStatusResponse;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
+import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.overlord.http.TaskPayloadResponse;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
import org.apache.druid.java.util.common.ISE;
@@ -262,15 +263,16 @@ public class OverlordResourceTestClient
public String getTaskErrorMessage(String taskId)
{
- return
getTaskReport(taskId).get(IngestionStatsAndErrorsTaskReport.REPORT_KEY)
- .getPayload().getErrorMsg();
+ return ((IngestionStatsAndErrorsTaskReport)
getTaskReport(taskId).get(IngestionStatsAndErrorsTaskReport.REPORT_KEY))
+ .getPayload().getErrorMsg();
}
public RowIngestionMetersTotals getTaskStats(String taskId)
{
try {
- Object buildSegment =
getTaskReport(taskId).get(IngestionStatsAndErrorsTaskReport.REPORT_KEY)
-
.getPayload().getRowStats().get("buildSegments");
+ Object buildSegment = ((IngestionStatsAndErrorsTaskReport)
getTaskReport(taskId).get(
+ IngestionStatsAndErrorsTaskReport.REPORT_KEY))
+ .getPayload().getRowStats().get("buildSegments");
return jsonMapper.convertValue(buildSegment,
RowIngestionMetersTotals.class);
}
catch (Exception e) {
@@ -278,7 +280,7 @@ public class OverlordResourceTestClient
}
}
- public Map<String, IngestionStatsAndErrorsTaskReport> getTaskReport(String
taskId)
+ public Map<String, TaskReport> getTaskReport(String taskId)
{
try {
StatusResponseHolder response = makeRequest(
@@ -291,7 +293,7 @@ public class OverlordResourceTestClient
);
return jsonMapper.readValue(
response.getContent(),
- new TypeReference<Map<String, IngestionStatsAndErrorsTaskReport>>()
+ new TypeReference<Map<String, TaskReport>>()
{
}
);
diff --git
a/integration-tests/src/main/java/org/apache/druid/testing/clients/msq/MsqOverlordResourceTestClient.java
b/integration-tests/src/main/java/org/apache/druid/testing/clients/msq/MsqOverlordResourceTestClient.java
index c9291c45320..f647a75e33a 100644
---
a/integration-tests/src/main/java/org/apache/druid/testing/clients/msq/MsqOverlordResourceTestClient.java
+++
b/integration-tests/src/main/java/org/apache/druid/testing/clients/msq/MsqOverlordResourceTestClient.java
@@ -23,11 +23,11 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.apache.druid.msq.guice.MSQIndexingModule;
-import org.apache.druid.msq.indexing.report.MSQTaskReport;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.clients.OverlordResourceTestClient;
import org.apache.druid.testing.guice.TestClient;
@@ -54,7 +54,7 @@ public class MsqOverlordResourceTestClient extends
OverlordResourceTestClient
this.jsonMapper.registerModules(new
MSQIndexingModule().getJacksonModules());
}
- public Map<String, MSQTaskReport> getMsqTaskReport(String taskId)
+ public Map<String, TaskReport> getMsqTaskReport(String taskId)
{
try {
StatusResponseHolder response = makeRequest(
@@ -65,7 +65,7 @@ public class MsqOverlordResourceTestClient extends
OverlordResourceTestClient
StringUtils.format("task/%s/reports",
StringUtils.urlEncode(taskId))
)
);
- return jsonMapper.readValue(response.getContent(), new
TypeReference<Map<String, MSQTaskReport>>()
+ return jsonMapper.readValue(response.getContent(), new
TypeReference<Map<String, TaskReport>>()
{
});
}
diff --git
a/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java
b/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java
index 3d20e31f566..e98793b3c8c 100644
---
a/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java
+++
b/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java
@@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RetryUtils;
@@ -185,7 +186,7 @@ public class MsqTestQueryHelper extends
AbstractTestQueryHelper<MsqQueryWithResu
/**
* Fetches status reports for a given task
*/
- public Map<String, MSQTaskReport> fetchStatusReports(String taskId)
+ public Map<String, TaskReport> fetchStatusReports(String taskId)
{
return overlordClient.getMsqTaskReport(taskId);
}
@@ -195,8 +196,8 @@ public class MsqTestQueryHelper extends
AbstractTestQueryHelper<MsqQueryWithResu
*/
private void compareResults(String taskId, MsqQueryWithResults
expectedQueryWithResults)
{
- Map<String, MSQTaskReport> statusReport = fetchStatusReports(taskId);
- MSQTaskReport taskReport = statusReport.get(MSQTaskReport.REPORT_KEY);
+ Map<String, TaskReport> statusReport = fetchStatusReports(taskId);
+ MSQTaskReport taskReport = (MSQTaskReport)
statusReport.get(MSQTaskReport.REPORT_KEY);
if (taskReport == null) {
throw new ISE("Unable to fetch the status report for the task [%]",
taskId);
}
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
index d419bbb48d3..f4a2cdca4b5 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
@@ -25,6 +25,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.druid.indexer.partitions.SecondaryPartitionType;
import org.apache.druid.indexing.common.IngestionStatsAndErrors;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
+import org.apache.druid.indexing.common.TaskContextReport;
import org.apache.druid.indexing.common.TaskReport;
import
org.apache.druid.indexing.common.task.batch.parallel.PartialDimensionCardinalityTask;
import
org.apache.druid.indexing.common.task.batch.parallel.PartialDimensionDistributionTask;
@@ -382,6 +383,11 @@ public abstract class AbstractITBatchIndexTest extends
AbstractIndexerTest
segmentAvailabilityConfirmationPair.rhs
);
}
+
+ TaskContextReport taskContextReport =
+ (TaskContextReport)
indexer.getTaskReport(taskID).get(TaskContextReport.REPORT_KEY);
+
+ Assert.assertFalse(taskContextReport.getPayload().isEmpty());
}
// IT*ParallelIndexTest do a second round of ingestion to replace
segements in an existing
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
index 2b37c7a27d2..9bfc2ad02e5 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
@@ -23,6 +23,7 @@ import com.google.inject.Inject;
import org.apache.commons.io.IOUtils;
import org.apache.druid.indexing.common.IngestionStatsAndErrors;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
+import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.GranularityType;
@@ -170,20 +171,24 @@ public class ITCompactionTaskTest extends
AbstractIndexerTest
checkCompactionIntervals(expectedIntervalAfterCompaction);
- Map<String, IngestionStatsAndErrorsTaskReport> reports =
indexer.getTaskReport(taskId);
+ Map<String, TaskReport> reports = indexer.getTaskReport(taskId);
Assert.assertTrue(reports != null && reports.size() > 0);
- Assert.assertEquals(2,
- reports.values()
- .stream()
- .mapToLong(r -> ((IngestionStatsAndErrors)
r.getPayload()).getSegmentsPublished())
- .sum()
+ Assert.assertEquals(
+ 2,
+ reports.values()
+ .stream()
+ .filter(r -> r instanceof IngestionStatsAndErrorsTaskReport)
+ .mapToLong(r -> ((IngestionStatsAndErrors)
r.getPayload()).getSegmentsPublished())
+ .sum()
);
- Assert.assertEquals(4,
- reports.values()
- .stream()
- .mapToLong(r -> ((IngestionStatsAndErrors)
r.getPayload()).getSegmentsRead())
- .sum()
+ Assert.assertEquals(
+ 4,
+ reports.values()
+ .stream()
+ .filter(r -> r instanceof IngestionStatsAndErrorsTaskReport)
+ .mapToLong(r -> ((IngestionStatsAndErrors)
r.getPayload()).getSegmentsRead())
+ .sum()
);
}
}
@@ -268,7 +273,7 @@ public class ITCompactionTaskTest extends
AbstractIndexerTest
}
checkCompactionIntervals(expectedIntervalAfterCompaction);
- Map<String, IngestionStatsAndErrorsTaskReport> reports =
indexer.getTaskReport(taskId);
+ Map<String, TaskReport> reports = indexer.getTaskReport(taskId);
Assert.assertTrue(reports != null && reports.size() > 0);
}
}
diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java
b/services/src/main/java/org/apache/druid/cli/CliOverlord.java
index f48c9be01b6..771e76561ad 100644
--- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java
+++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java
@@ -62,6 +62,8 @@ import
org.apache.druid.indexing.common.actions.TaskAuditLogConfig;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
import
org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
+import org.apache.druid.indexing.common.task.NoopTaskContextEnricher;
+import org.apache.druid.indexing.common.task.TaskContextEnricher;
import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClientProvider;
import org.apache.druid.indexing.common.task.batch.parallel.ShuffleClient;
import org.apache.druid.indexing.common.tasklogs.SwitchingTaskLogStreamer;
@@ -238,6 +240,18 @@ public class CliOverlord extends ServerRunnable
.in(LazySingleton.class);
binder.bind(DropwizardRowIngestionMetersFactory.class).in(LazySingleton.class);
+ PolyBind.optionBinder(binder, Key.get(TaskContextEnricher.class))
+ .addBinding(NoopTaskContextEnricher.TYPE)
+ .to(NoopTaskContextEnricher.class)
+ .in(LazySingleton.class);
+
+ PolyBind.createChoiceWithDefault(
+ binder,
+ "druid.indexer.task.contextenricher.type",
+ Key.get(TaskContextEnricher.class),
+ NoopTaskContextEnricher.TYPE
+ );
+
configureTaskStorage(binder);
configureIntermediaryData(binder);
configureAutoscale(binder);
diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java
b/services/src/main/java/org/apache/druid/cli/CliPeon.java
index 1283809fccb..92699694dda 100644
--- a/services/src/main/java/org/apache/druid/cli/CliPeon.java
+++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java
@@ -308,13 +308,18 @@ public class CliPeon extends GuiceRunnable
@Named(ServiceStatusMonitor.HEARTBEAT_TAGS_BINDING)
public Supplier<Map<String, Object>> heartbeatDimensions(Task task)
{
+ ImmutableMap.Builder<String, Object> builder =
ImmutableMap.builder();
+ builder.put(DruidMetrics.TASK_ID, task.getId());
+ builder.put(DruidMetrics.DATASOURCE, task.getDataSource());
+ builder.put(DruidMetrics.TASK_TYPE, task.getType());
+ builder.put(DruidMetrics.GROUP_ID, task.getGroupId());
+ Map<String, Object> tags = task.getContextValue(DruidMetrics.TAGS);
+ if (tags != null && !tags.isEmpty()) {
+ builder.put(DruidMetrics.TAGS, tags);
+ }
+
return Suppliers.ofInstance(
- ImmutableMap.of(
- DruidMetrics.TASK_ID, task.getId(),
- DruidMetrics.DATASOURCE, task.getDataSource(),
- DruidMetrics.TASK_TYPE, task.getType(),
- DruidMetrics.GROUP_ID, task.getGroupId()
- )
+ builder.build()
);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]