kfaraz commented on code in PR #16616:
URL: https://github.com/apache/druid/pull/16616#discussion_r1643900152


##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3UploadManager.java:
##########
@@ -45,17 +49,28 @@
 public class S3UploadManager
 {
   private final ExecutorService uploadExecutor;
+  private final ServiceEmitter emitter;
 
   private static final Logger log = new Logger(S3UploadManager.class);
 
+  // For metrics regarding uploadExecutor.
+  private final AtomicInteger queueSize = new AtomicInteger(0);
+
+  // Metric related constants.
+  private static final String METRIC_PREFIX = "s3upload/threadPool/";
+  private static final String TASK_QUEUED_DURATION_METRIC = METRIC_PREFIX + 
"taskQueuedDuration";
+  private static final String NUM_TASKS_QUEUED_METRIC = METRIC_PREFIX + 
"queuedTasks";
+  private static final String TASK_DURATION_METRIC = METRIC_PREFIX + 
"taskDuration";

Review Comment:
   Nit: style suggestion
   
   By convention, most devs keep the constants at the top of the class and then 
all the member fields together.



##########
docs/operations/metrics.md:
##########
@@ -298,6 +298,9 @@ If the JVM does not support CPU time measurement for the 
current thread, `ingest
 |`worker/task/assigned/count`|Number of tasks assigned to an indexer per 
emission period. This metric is only available if the 
`WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies|
 |`worker/task/completed/count`|Number of tasks completed by an indexer per 
emission period. This metric is only available if the 
`WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies|
 |`worker/task/running/count`|Number of tasks running on an indexer per 
emission period. This metric is only available if the 
`WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies|
+|`s3upload/threadPool/taskQueuedDuration`|Milliseconds spent by a task in 
queue before it starts uploading a part to S3 (in a multi-part upload) when 
durable storage is enabled.||Varies|
+|`s3upload/threadPool/queuedTasks`|The number of tasks that are currently 
queued and waiting to upload a part to S3 (in a multi-part upload) when durable 
storage is enabled.||Varies|
+|`s3upload/threadPool/taskDuration`|The time taken in milliseconds to upload a 
part to S3 (in a multi-part upload) when durable storage is 
enabled.|`uploadId`, `partNumber`|Varies|

Review Comment:
   Better names:
   
   ```
   s3upload/chunk/uploadTime
   s3upload/chunk/queuedTime
   s3upload/chunk/queueSize
   ```
   
   Please also emit a metric from `RetryableS3OutputStream` which tracks total 
time taken to upload all chunks of a job, say (`s3upload/job/totalTime`). That 
would give us a good measure of the parallelism:
   
   parallelism is good if
   
   ```
   s3upload/job/totalTime < sum(s3upload/chunk/uploadTimeMillis)
   ```



##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3UploadManager.java:
##########
@@ -87,25 +102,36 @@ public Future<UploadPartResult> queueChunkForUpload(
       S3OutputConfig config
   )
   {
-    return uploadExecutor.submit(() -> RetryUtils.retry(
-        () -> {
-          log.debug("Uploading chunk[%d] for uploadId[%s].", chunkNumber, 
uploadId);
-          UploadPartResult uploadPartResult = uploadPartIfPossible(
-              s3Client,
-              uploadId,
-              config.getBucket(),
-              key,
-              chunkNumber,
-              chunkFile
-          );
-          if (!chunkFile.delete()) {
-            log.warn("Failed to delete chunk [%s]", 
chunkFile.getAbsolutePath());
-          }
-          return uploadPartResult;
-        },
-        S3Utils.S3RETRY,
-        config.getMaxRetry()
-    ));
+    final ServiceMetricEvent.Builder builder = new 
ServiceMetricEvent.Builder();
+    final Stopwatch taskQueuedStopwatch = Stopwatch.createStarted();
+    queueSize.incrementAndGet();
+    return uploadExecutor.submit(() -> {
+      emitter.emit(builder.setMetric(TASK_QUEUED_DURATION_METRIC, 
taskQueuedStopwatch.millisElapsed()));
+      emitter.emit(builder.setMetric(NUM_TASKS_QUEUED_METRIC, 
queueSize.decrementAndGet()));
+      final Stopwatch taskDurationStopwatch = Stopwatch.createStarted();
+      return RetryUtils.retry(
+          () -> {
+            log.debug("Uploading chunk[%d] for uploadId[%s].", chunkNumber, 
uploadId);
+            UploadPartResult uploadPartResult = uploadPartIfPossible(
+                s3Client,
+                uploadId,
+                config.getBucket(),
+                key,
+                chunkNumber,
+                chunkFile
+            );
+            if (!chunkFile.delete()) {
+              log.warn("Failed to delete chunk [%s]", 
chunkFile.getAbsolutePath());
+            }
+            emitter.emit(builder.setMetric(TASK_DURATION_METRIC, 
taskDurationStopwatch.millisElapsed())
+                                .setDimension("uploadId", uploadId)
+                                .setDimension("partNumber", chunkNumber));

Review Comment:
   You can include these dimensions in the queued time metric as well. Also, 
the dimensions need to be set on the builder before calling `emitter.emit()`.



##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3UploadManager.java:
##########
@@ -87,25 +102,36 @@ public Future<UploadPartResult> queueChunkForUpload(
       S3OutputConfig config
   )
   {
-    return uploadExecutor.submit(() -> RetryUtils.retry(
-        () -> {
-          log.debug("Uploading chunk[%d] for uploadId[%s].", chunkNumber, 
uploadId);
-          UploadPartResult uploadPartResult = uploadPartIfPossible(
-              s3Client,
-              uploadId,
-              config.getBucket(),
-              key,
-              chunkNumber,
-              chunkFile
-          );
-          if (!chunkFile.delete()) {
-            log.warn("Failed to delete chunk [%s]", 
chunkFile.getAbsolutePath());
-          }
-          return uploadPartResult;
-        },
-        S3Utils.S3RETRY,
-        config.getMaxRetry()
-    ));
+    final ServiceMetricEvent.Builder builder = new 
ServiceMetricEvent.Builder();
+    final Stopwatch taskQueuedStopwatch = Stopwatch.createStarted();
+    queueSize.incrementAndGet();
+    return uploadExecutor.submit(() -> {
+      emitter.emit(builder.setMetric(TASK_QUEUED_DURATION_METRIC, 
taskQueuedStopwatch.millisElapsed()));
+      emitter.emit(builder.setMetric(NUM_TASKS_QUEUED_METRIC, 
queueSize.decrementAndGet()));
+      final Stopwatch taskDurationStopwatch = Stopwatch.createStarted();

Review Comment:
   Tip: you may `reset()` the same stopwatch `taskQueuedStopwatch()` and reuse 
it for the task duration metric.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to