kfaraz commented on code in PR #16616:
URL: https://github.com/apache/druid/pull/16616#discussion_r1643900152
##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3UploadManager.java:
##########
@@ -45,17 +49,28 @@
public class S3UploadManager
{
private final ExecutorService uploadExecutor;
+ private final ServiceEmitter emitter;
private static final Logger log = new Logger(S3UploadManager.class);
+ // For metrics regarding uploadExecutor.
+ private final AtomicInteger queueSize = new AtomicInteger(0);
+
+ // Metric related constants.
+ private static final String METRIC_PREFIX = "s3upload/threadPool/";
+ private static final String TASK_QUEUED_DURATION_METRIC = METRIC_PREFIX +
"taskQueuedDuration";
+ private static final String NUM_TASKS_QUEUED_METRIC = METRIC_PREFIX +
"queuedTasks";
+ private static final String TASK_DURATION_METRIC = METRIC_PREFIX +
"taskDuration";
Review Comment:
Nit: style suggestion
By convention, most devs keep the constants at the top of the class and then
all the member fields together.
##########
docs/operations/metrics.md:
##########
@@ -298,6 +298,9 @@ If the JVM does not support CPU time measurement for the
current thread, `ingest
|`worker/task/assigned/count`|Number of tasks assigned to an indexer per
emission period. This metric is only available if the
`WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies|
|`worker/task/completed/count`|Number of tasks completed by an indexer per
emission period. This metric is only available if the
`WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies|
|`worker/task/running/count`|Number of tasks running on an indexer per
emission period. This metric is only available if the
`WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies|
+|`s3upload/threadPool/taskQueuedDuration`|Milliseconds spent by a task in
queue before it starts uploading a part to S3 (in a multi-part upload) when
durable storage is enabled.||Varies|
+|`s3upload/threadPool/queuedTasks`|The number of tasks that are currently
queued and waiting to upload a part to S3 (in a multi-part upload) when durable
storage is enabled.||Varies|
+|`s3upload/threadPool/taskDuration`|The time taken in milliseconds to upload a
part to S3 (in a multi-part upload) when durable storage is
enabled.|`uploadId`, `partNumber`|Varies|
Review Comment:
Better names:
```
s3upload/chunk/uploadTime
s3upload/chunk/queuedTime
s3upload/chunk/queueSize
```
Please also emit a metric from `RetryableS3OutputStream` which tracks total
time taken to upload all chunks of a job, say (`s3upload/job/totalTime`). That
would give us a good measure of the parallelism:
parallelism is good if
```
s3upload/job/totalTime < sum(s3upload/chunk/uploadTimeMillis)
```
##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3UploadManager.java:
##########
@@ -87,25 +102,36 @@ public Future<UploadPartResult> queueChunkForUpload(
S3OutputConfig config
)
{
- return uploadExecutor.submit(() -> RetryUtils.retry(
- () -> {
- log.debug("Uploading chunk[%d] for uploadId[%s].", chunkNumber,
uploadId);
- UploadPartResult uploadPartResult = uploadPartIfPossible(
- s3Client,
- uploadId,
- config.getBucket(),
- key,
- chunkNumber,
- chunkFile
- );
- if (!chunkFile.delete()) {
- log.warn("Failed to delete chunk [%s]",
chunkFile.getAbsolutePath());
- }
- return uploadPartResult;
- },
- S3Utils.S3RETRY,
- config.getMaxRetry()
- ));
+ final ServiceMetricEvent.Builder builder = new
ServiceMetricEvent.Builder();
+ final Stopwatch taskQueuedStopwatch = Stopwatch.createStarted();
+ queueSize.incrementAndGet();
+ return uploadExecutor.submit(() -> {
+ emitter.emit(builder.setMetric(TASK_QUEUED_DURATION_METRIC,
taskQueuedStopwatch.millisElapsed()));
+ emitter.emit(builder.setMetric(NUM_TASKS_QUEUED_METRIC,
queueSize.decrementAndGet()));
+ final Stopwatch taskDurationStopwatch = Stopwatch.createStarted();
+ return RetryUtils.retry(
+ () -> {
+ log.debug("Uploading chunk[%d] for uploadId[%s].", chunkNumber,
uploadId);
+ UploadPartResult uploadPartResult = uploadPartIfPossible(
+ s3Client,
+ uploadId,
+ config.getBucket(),
+ key,
+ chunkNumber,
+ chunkFile
+ );
+ if (!chunkFile.delete()) {
+ log.warn("Failed to delete chunk [%s]",
chunkFile.getAbsolutePath());
+ }
+ emitter.emit(builder.setMetric(TASK_DURATION_METRIC,
taskDurationStopwatch.millisElapsed())
+ .setDimension("uploadId", uploadId)
+ .setDimension("partNumber", chunkNumber));
Review Comment:
You can include these dimensions in the queued time metric as well. Also,
the dimensions need to be set on the builder before calling `emitter.emit()`.
##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3UploadManager.java:
##########
@@ -87,25 +102,36 @@ public Future<UploadPartResult> queueChunkForUpload(
S3OutputConfig config
)
{
- return uploadExecutor.submit(() -> RetryUtils.retry(
- () -> {
- log.debug("Uploading chunk[%d] for uploadId[%s].", chunkNumber,
uploadId);
- UploadPartResult uploadPartResult = uploadPartIfPossible(
- s3Client,
- uploadId,
- config.getBucket(),
- key,
- chunkNumber,
- chunkFile
- );
- if (!chunkFile.delete()) {
- log.warn("Failed to delete chunk [%s]",
chunkFile.getAbsolutePath());
- }
- return uploadPartResult;
- },
- S3Utils.S3RETRY,
- config.getMaxRetry()
- ));
+ final ServiceMetricEvent.Builder builder = new
ServiceMetricEvent.Builder();
+ final Stopwatch taskQueuedStopwatch = Stopwatch.createStarted();
+ queueSize.incrementAndGet();
+ return uploadExecutor.submit(() -> {
+ emitter.emit(builder.setMetric(TASK_QUEUED_DURATION_METRIC,
taskQueuedStopwatch.millisElapsed()));
+ emitter.emit(builder.setMetric(NUM_TASKS_QUEUED_METRIC,
queueSize.decrementAndGet()));
+ final Stopwatch taskDurationStopwatch = Stopwatch.createStarted();
Review Comment:
Tip: you may `reset()` the same stopwatch `taskQueuedStopwatch()` and reuse
it for the task duration metric.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]