This is an automated email from the ASF dual-hosted git repository.

amatya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new a738ac9ad7 Improve task pause logging and metrics for streaming 
ingestion (#13313)
a738ac9ad7 is described below

commit a738ac9ad77194e66ff48d4253664649f78925b8
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Mon Nov 7 21:33:54 2022 +0530

    Improve task pause logging and metrics for streaming ingestion (#13313)
    
    * Improve task pause logging and metrics for streaming ingestion
    
    * Add metrics doc
    
    * Fix spelling
---
 docs/operations/metrics.md                                          | 3 ++-
 .../indexing/seekablestream/SeekableStreamIndexTaskRunner.java      | 6 ++++--
 .../seekablestream/supervisor/SeekableStreamSupervisor.java         | 4 +++-
 3 files changed, 9 insertions(+), 4 deletions(-)

diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index ca155fff7d..7f91379043 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -221,7 +221,8 @@ batch ingestion emit the following metrics. These metrics 
are deltas for each em
 |`ingest/sink/count`|Number of sinks not handoffed.|dataSource, taskId, 
taskType.|1~3|
 |`ingest/events/messageGap`|Time gap in milliseconds between the latest 
ingested event timestamp and the current system timestamp of metrics emission. 
|dataSource, taskId, taskType.|Greater than 0, depends on the time carried in 
event |
 |`ingest/notices/queueSize`|Number of pending notices to be processed by the 
coordinator|dataSource.|Typically 0 and occasionally in lower single digits. 
Should not be a very high number. |
-|`ingest/notices/time`|Milliseconds taken to process a notice by the 
supervisor|dataSource, noticeType.| < 1s. |
+|`ingest/notices/time`|Milliseconds taken to process a notice by the 
supervisor|dataSource| < 1s. |
+|`ingest/pause/time`|Milliseconds spent by a task in a paused state without 
ingesting.|dataSource, taskId| < 10 seconds.|
 
 
 Note: If the JVM does not support CPU time measurement for the current thread, 
ingest/merge/cpu and ingest/persists/cpu will be 0.
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index cc6d8e61e6..2d85eb4407 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -1311,14 +1311,16 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
         status = Status.PAUSED;
         hasPaused.signalAll();
 
-        log.debug("Received pause command, pausing ingestion until resumed.");
+        long pauseTime = System.currentTimeMillis();
+        log.info("Received pause command, pausing ingestion until resumed.");
         while (pauseRequested) {
           shouldResume.await();
         }
 
         status = Status.READING;
         shouldResume.signalAll();
-        log.debug("Received resume command, resuming ingestion.");
+        log.info("Received resume command, resuming ingestion.");
+        task.emitMetric(toolbox.getEmitter(), "ingest/pause/time", 
System.currentTimeMillis() - pauseTime);
         return true;
       }
     }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 2754fe72f2..9c90cfe983 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -3064,7 +3064,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
               }
 
               log.info(
-                  "Setting endOffsets for tasks in taskGroup [%d] to %s and 
resuming",
+                  "Setting endOffsets for tasks in taskGroup [%d] to %s",
                   taskGroup.groupId,
                   endOffsets
               );
@@ -3083,6 +3083,8 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
                       taskId
                   );
                   taskGroup.tasks.remove(taskId);
+                } else {
+                  log.info("Successfully set endOffsets for task[%s] and 
resumed it", setEndOffsetTaskIds.get(i));
                 }
               }
             }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to