This is an automated email from the ASF dual-hosted git repository.
amatya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new a738ac9ad7 Improve task pause logging and metrics for streaming
ingestion (#13313)
a738ac9ad7 is described below
commit a738ac9ad77194e66ff48d4253664649f78925b8
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Mon Nov 7 21:33:54 2022 +0530
Improve task pause logging and metrics for streaming ingestion (#13313)
* Improve task pause logging and metrics for streaming ingestion
* Add metrics doc
* Fix spelling
---
docs/operations/metrics.md | 3 ++-
.../indexing/seekablestream/SeekableStreamIndexTaskRunner.java | 6 ++++--
.../seekablestream/supervisor/SeekableStreamSupervisor.java | 4 +++-
3 files changed, 9 insertions(+), 4 deletions(-)
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index ca155fff7d..7f91379043 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -221,7 +221,8 @@ batch ingestion emit the following metrics. These metrics
are deltas for each em
|`ingest/sink/count`|Number of sinks not handoffed.|dataSource, taskId,
taskType.|1~3|
|`ingest/events/messageGap`|Time gap in milliseconds between the latest
ingested event timestamp and the current system timestamp of metrics emission.
|dataSource, taskId, taskType.|Greater than 0, depends on the time carried in
event |
|`ingest/notices/queueSize`|Number of pending notices to be processed by the
coordinator|dataSource.|Typically 0 and occasionally in lower single digits.
Should not be a very high number. |
-|`ingest/notices/time`|Milliseconds taken to process a notice by the
supervisor|dataSource, noticeType.| < 1s. |
+|`ingest/notices/time`|Milliseconds taken to process a notice by the
supervisor|dataSource| < 1s. |
+|`ingest/pause/time`|Milliseconds spent by a task in a paused state without
ingesting.|dataSource, taskId| < 10 seconds.|
Note: If the JVM does not support CPU time measurement for the current thread,
ingest/merge/cpu and ingest/persists/cpu will be 0.
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index cc6d8e61e6..2d85eb4407 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -1311,14 +1311,16 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
status = Status.PAUSED;
hasPaused.signalAll();
- log.debug("Received pause command, pausing ingestion until resumed.");
+ long pauseTime = System.currentTimeMillis();
+ log.info("Received pause command, pausing ingestion until resumed.");
while (pauseRequested) {
shouldResume.await();
}
status = Status.READING;
shouldResume.signalAll();
- log.debug("Received resume command, resuming ingestion.");
+ log.info("Received resume command, resuming ingestion.");
+ task.emitMetric(toolbox.getEmitter(), "ingest/pause/time",
System.currentTimeMillis() - pauseTime);
return true;
}
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 2754fe72f2..9c90cfe983 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -3064,7 +3064,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
log.info(
- "Setting endOffsets for tasks in taskGroup [%d] to %s and
resuming",
+ "Setting endOffsets for tasks in taskGroup [%d] to %s",
taskGroup.groupId,
endOffsets
);
@@ -3083,6 +3083,8 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
taskId
);
taskGroup.tasks.remove(taskId);
+ } else {
+ log.info("Successfully set endOffsets for task[%s] and
resumed it", setEndOffsetTaskIds.get(i));
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]