This is an automated email from the ASF dual-hosted git repository.
amatya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 77478f25fb Add taskActionType dimension to task/action/run/time.
(#13333)
77478f25fb is described below
commit 77478f25fb3d33c032b26a1dbcb9ad485a417ea2
Author: Gian Merlino <[email protected]>
AuthorDate: Thu Nov 10 22:30:08 2022 -0800
Add taskActionType dimension to task/action/run/time. (#13333)
* Add taskActionType dimension to task/action/run/time.
* Spelling.
---
docs/operations/metrics.md | 2 +-
.../druid/indexing/kafka/KafkaIndexTaskTest.java | 3 +-
.../indexing/kinesis/KinesisIndexTaskTest.java | 3 +-
.../common/actions/LocalTaskActionClient.java | 32 ++++++++++++++++++++--
.../druid/indexing/common/actions/TaskAction.java | 6 +++-
.../indexing/common/actions/TaskActionToolbox.java | 12 +++++++-
...boxTest.java => LocalTaskActionClientTest.java} | 23 ++++++----------
.../indexing/common/actions/TaskActionTestKit.java | 3 +-
.../common/actions/TaskActionToolboxTest.java | 2 +-
.../AppenderatorDriverRealtimeIndexTaskTest.java | 3 +-
.../indexing/common/task/IngestionTestBase.java | 3 +-
.../common/task/RealtimeIndexTaskTest.java | 3 +-
.../druid/indexing/overlord/TaskLifecycleTest.java | 3 +-
website/.spelling | 1 +
14 files changed, 71 insertions(+), 28 deletions(-)
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index 5dbdc8d5dd..d4eb951cf7 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -237,7 +237,7 @@ Note: If the JVM does not support CPU time measurement for
the current thread, i
|`task/run/time`|Milliseconds taken to run a task.| dataSource, taskId,
taskType, taskStatus. |Varies.|
|`task/pending/time`|Milliseconds taken for a task to wait for running.|
dataSource, taskId, taskType. |Varies.|
|`task/action/log/time`|Milliseconds taken to log a task action to the audit
log.| dataSource, taskId, taskType |< 1000
(subsecond)|
-|`task/action/run/time`|Milliseconds taken to execute a task action.|
dataSource, taskId, taskType |Varies from
subsecond to a few seconds, based on action type.|
+|`task/action/run/time`|Milliseconds taken to execute a task action.|
dataSource, taskId, taskType, taskActionType
|Varies from subsecond to a few seconds, based on action type.|
|`segment/added/bytes`|Size in bytes of new segments created.| dataSource,
taskId, taskType, interval. |Varies.|
|`segment/moved/bytes`|Size in bytes of segments moved/archived via the Move
Task.| dataSource, taskId, taskType, interval. |Varies.|
|`segment/nuked/bytes`|Size in bytes of segments deleted via the Kill Task.|
dataSource, taskId, taskType, interval. |Varies.|
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 36ddea4c36..a83bbaf515 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -3121,7 +3121,8 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
);
return true;
}
- }
+ },
+ objectMapper
);
final TaskActionClientFactory taskActionClientFactory = new
LocalTaskActionClientFactory(
taskStorage,
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index c59c479569..553b601f7f 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -3085,7 +3085,8 @@ public class KinesisIndexTaskTest extends
SeekableStreamIndexTaskTestBase
);
return true;
}
- }
+ },
+ objectMapper
);
final TaskActionClientFactory taskActionClientFactory = new
LocalTaskActionClientFactory(
taskStorage,
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LocalTaskActionClient.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LocalTaskActionClient.java
index 8c7cb533ae..27e0bcbaa2 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LocalTaskActionClient.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LocalTaskActionClient.java
@@ -19,13 +19,18 @@
package org.apache.druid.indexing.common.actions;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import javax.annotation.Nullable;
+import java.util.Map;
+
public class LocalTaskActionClient implements TaskActionClient
{
private static final EmittingLogger log = new
EmittingLogger(LocalTaskActionClient.class);
@@ -58,7 +63,7 @@ public class LocalTaskActionClient implements TaskActionClient
try {
final long auditLogStartTime = System.currentTimeMillis();
storage.addAuditLog(task, taskAction);
- emitTimerMetric("task/action/log/time", System.currentTimeMillis() -
auditLogStartTime);
+ emitTimerMetric("task/action/log/time", taskAction,
System.currentTimeMillis() - auditLogStartTime);
}
catch (Exception e) {
final String actionClass = taskAction.getClass().getName();
@@ -72,14 +77,35 @@ public class LocalTaskActionClient implements
TaskActionClient
final long performStartTime = System.currentTimeMillis();
final RetType result = taskAction.perform(task, toolbox);
- emitTimerMetric("task/action/run/time", System.currentTimeMillis() -
performStartTime);
+ emitTimerMetric("task/action/run/time", taskAction,
System.currentTimeMillis() - performStartTime);
return result;
}
- private void emitTimerMetric(final String metric, final long time)
+ private void emitTimerMetric(final String metric, final TaskAction<?>
action, final long time)
{
final ServiceMetricEvent.Builder metricBuilder =
ServiceMetricEvent.builder();
IndexTaskUtils.setTaskDimensions(metricBuilder, task);
+ final String actionType = getActionType(toolbox.getJsonMapper(), action);
+ if (actionType != null) {
+ metricBuilder.setDimension("taskActionType", actionType);
+ }
toolbox.getEmitter().emit(metricBuilder.build(metric, Math.max(0, time)));
}
+
+ @Nullable
+ static String getActionType(final ObjectMapper jsonMapper, final
TaskAction<?> action)
+ {
+ try {
+ final Map<String, Object> m = jsonMapper.convertValue(action,
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT);
+ final Object typeObject = m.get(TaskAction.TYPE_FIELD);
+ if (typeObject instanceof String) {
+ return (String) typeObject;
+ } else {
+ return null;
+ }
+ }
+ catch (Exception e) {
+ return null;
+ }
+ }
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java
index 02e5ef5012..559039d96e 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java
@@ -24,7 +24,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.druid.indexing.common.task.Task;
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = TaskAction.TYPE_FIELD)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "lockAcquire", value =
TimeChunkLockAcquireAction.class),
@JsonSubTypes.Type(name = "lockTryAcquire", value =
TimeChunkLockTryAcquireAction.class),
@@ -50,8 +50,12 @@ import org.apache.druid.indexing.common.task.Task;
})
public interface TaskAction<RetType>
{
+ String TYPE_FIELD = "type";
+
TypeReference<RetType> getReturnTypeReference(); // T_T
+
RetType perform(Task task, TaskActionToolbox toolbox);
+
boolean isAudited();
@Override
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskActionToolbox.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskActionToolbox.java
index 61c6180f4a..134a9bf6c7 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskActionToolbox.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskActionToolbox.java
@@ -19,8 +19,10 @@
package org.apache.druid.indexing.common.actions;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.inject.Inject;
+import org.apache.druid.guice.annotations.Json;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskLockbox;
import org.apache.druid.indexing.overlord.TaskRunner;
@@ -36,6 +38,7 @@ public class TaskActionToolbox
private final IndexerMetadataStorageCoordinator
indexerMetadataStorageCoordinator;
private final ServiceEmitter emitter;
private final SupervisorManager supervisorManager;
+ private final ObjectMapper jsonMapper;
private Optional<TaskRunnerFactory> factory = Optional.absent();
@Inject
@@ -44,7 +47,8 @@ public class TaskActionToolbox
TaskStorage taskStorage,
IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator,
ServiceEmitter emitter,
- SupervisorManager supervisorManager
+ SupervisorManager supervisorManager,
+ @Json ObjectMapper jsonMapper
)
{
this.taskLockbox = taskLockbox;
@@ -52,6 +56,7 @@ public class TaskActionToolbox
this.indexerMetadataStorageCoordinator = indexerMetadataStorageCoordinator;
this.emitter = emitter;
this.supervisorManager = supervisorManager;
+ this.jsonMapper = jsonMapper;
}
public TaskLockbox getTaskLockbox()
@@ -79,6 +84,11 @@ public class TaskActionToolbox
return supervisorManager;
}
+ public ObjectMapper getJsonMapper()
+ {
+ return jsonMapper;
+ }
+
@Inject(optional = true)
public void setTaskRunnerFactory(TaskRunnerFactory factory)
{
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionToolboxTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/LocalTaskActionClientTest.java
similarity index 54%
copy from
indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionToolboxTest.java
copy to
indexing-service/src/test/java/org/apache/druid/indexing/common/actions/LocalTaskActionClientTest.java
index 232239ce9b..5530798482 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionToolboxTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/LocalTaskActionClientTest.java
@@ -19,26 +19,21 @@
package org.apache.druid.indexing.common.actions;
-import org.apache.druid.indexing.overlord.ForkingTaskRunner;
-import org.apache.druid.indexing.overlord.ForkingTaskRunnerFactory;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.junit.Assert;
import org.junit.Test;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import java.util.Collections;
-public class TaskActionToolboxTest
+public class LocalTaskActionClientTest
{
+ private final ObjectMapper objectMapper = new DefaultObjectMapper();
@Test
- public void testMakeCodeCoverageHappy()
+ public void testGetActionType()
{
- TaskActionToolbox toolbox = new TaskActionToolbox(null, null, null, null,
null);
- assertFalse(toolbox.getTaskRunner().isPresent());
- ForkingTaskRunnerFactory factory = mock(ForkingTaskRunnerFactory.class);
- when(factory.get()).thenReturn(mock(ForkingTaskRunner.class));
- toolbox.setTaskRunnerFactory(factory);
- assertTrue(toolbox.getTaskRunner().isPresent());
+ final TaskAction<?> action =
SegmentTransactionalInsertAction.appendAction(Collections.emptySet(), null,
null);
+ Assert.assertEquals("segmentTransactionalInsert",
LocalTaskActionClient.getActionType(objectMapper, action));
}
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java
index ea2c854c3e..2d6b22732a 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java
@@ -104,7 +104,8 @@ public class TaskActionTestKit extends ExternalResource
taskStorage,
metadataStorageCoordinator,
new NoopServiceEmitter(),
- EasyMock.createMock(SupervisorManager.class)
+ EasyMock.createMock(SupervisorManager.class),
+ objectMapper
);
testDerbyConnector.createDataSourceTable();
testDerbyConnector.createPendingSegmentsTable();
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionToolboxTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionToolboxTest.java
index 232239ce9b..d58c1f5df1 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionToolboxTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionToolboxTest.java
@@ -34,7 +34,7 @@ public class TaskActionToolboxTest
@Test
public void testMakeCodeCoverageHappy()
{
- TaskActionToolbox toolbox = new TaskActionToolbox(null, null, null, null,
null);
+ TaskActionToolbox toolbox = new TaskActionToolbox(null, null, null, null,
null, null);
assertFalse(toolbox.getTaskRunner().isPresent());
ForkingTaskRunnerFactory factory = mock(ForkingTaskRunnerFactory.class);
when(factory.get()).thenReturn(mock(ForkingTaskRunner.class));
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
index f2a742bdb2..7bff995578 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
@@ -1574,7 +1574,8 @@ public class AppenderatorDriverRealtimeIndexTaskTest
extends InitializedNullHand
taskStorage,
mdc,
EMITTER,
- EasyMock.createMock(SupervisorManager.class)
+ EasyMock.createMock(SupervisorManager.class),
+ OBJECT_MAPPER
);
final TaskActionClientFactory taskActionClientFactory = new
LocalTaskActionClientFactory(
taskStorage,
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
index 2d3cb8dbad..599f1ffc18 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
@@ -206,7 +206,8 @@ public abstract class IngestionTestBase extends
InitializedNullHandlingTest
taskStorage,
storageCoordinator,
new NoopServiceEmitter(),
- null
+ null,
+ objectMapper
);
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
index d571bd0bc4..83b8a7bf60 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
@@ -915,7 +915,8 @@ public class RealtimeIndexTaskTest extends
InitializedNullHandlingTest
taskStorage,
mdc,
EMITTER,
- EasyMock.createMock(SupervisorManager.class)
+ EasyMock.createMock(SupervisorManager.class),
+ new DefaultObjectMapper()
);
final TaskActionClientFactory taskActionClientFactory = new
LocalTaskActionClientFactory(
taskStorage,
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
index 83cb962874..443f2c6d1e 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
@@ -598,7 +598,8 @@ public class TaskLifecycleTest extends
InitializedNullHandlingTest
taskStorage,
mdc,
emitter,
- EasyMock.createMock(SupervisorManager.class)
+ EasyMock.createMock(SupervisorManager.class),
+ mapper
),
new TaskAuditLogConfig(true)
);
diff --git a/website/.spelling b/website/.spelling
index 30300bd6a4..836d03c243 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -1631,6 +1631,7 @@ poolName
remoteAddress
segmentAvailabilityConfirmed
serviceName
+taskActionType
taskIngestionMode
taskStatus
taskType
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]