This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new cd438b1918c Emit metrics for S3UploadThreadPool (#16616)
cd438b1918c is described below

commit cd438b1918c1bf925e76ba97b6ce076cde831df4
Author: Akshat Jain <[email protected]>
AuthorDate: Fri Jun 21 11:36:47 2024 +0530

    Emit metrics for S3UploadThreadPool (#16616)
    
    * Emit metrics for S3UploadThreadPool
    
    * Address review comments
    
    * Revert unnecessary formatting change
    
    * Revert unnecessary formatting change in metrics.md file
    
    * Address review comments
    
    * Add metric for task duration
    
    * Minor fix in metrics.md
    
    * Add s3Key and uploadId in the log message
    
    * Address review comments
    
    * Create new instance of ServiceMetricEvent.Builder for thread safety
    
    * Address review comments
    
    * Address review comments
---
 docs/operations/metrics.md                         | 13 ++++
 .../storage/s3/output/RetryableS3OutputStream.java | 22 +++++--
 .../druid/storage/s3/output/S3UploadManager.java   | 75 ++++++++++++++++------
 .../storage/s3/S3StorageConnectorProviderTest.java |  4 +-
 .../s3/output/RetryableS3OutputStreamTest.java     |  4 +-
 .../storage/s3/output/S3StorageConnectorTest.java  |  4 +-
 .../storage/s3/output/S3UploadManagerTest.java     |  9 ++-
 .../java/util/metrics/StubServiceEmitter.java      |  4 +-
 8 files changed, 104 insertions(+), 31 deletions(-)

diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index bf241ac5708..1d37169684e 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -508,6 +508,19 @@ These metrics are only available if the `OshiSysMonitor` 
module is included.
 |`sys/tcpv4/out/rsts`|Total "out reset" packets sent to reset the 
connection||Generally 0|
 |`sys/tcpv4/retrans/segs`|Total segments re-transmitted||Varies|
 
+
+## S3 multi-part upload
+
+These metrics are only available if the `druid-s3-extensions` module is 
included and if certain specific features are being used: MSQ export to S3, 
durable intermediate storage on S3.
+
+|Metric|Description|Dimensions|Normal value|
+|------|-----------|----------|------------|
+|`s3/upload/part/queueSize`|Number of items currently waiting in queue to be 
uploaded to S3. Each item in the queue corresponds to a single part in a 
multi-part upload.||Varies|
+|`s3/upload/part/queuedTime`|Milliseconds spent by a single item (or part) in 
queue before it starts getting uploaded to S3.|`uploadId`, `partNumber`|Varies|
+|`s3/upload/part/time`|Milliseconds taken to upload a single part of a 
multi-part upload to S3.|`uploadId`, `partNumber`|Varies|
+|`s3/upload/total/time`|Milliseconds taken for uploading all parts of a 
multi-part upload to S3.|`uploadId`|Varies|
+|`s3/upload/total/bytes`|Total bytes uploaded to S3 during a multi-part 
upload.|`uploadId`|Varies|
+
 ## Cgroup
 
 These metrics are available on operating systems with the cgroup kernel 
feature. All the values are derived by reading from `/sys/fs/cgroup`.
diff --git 
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/RetryableS3OutputStream.java
 
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/RetryableS3OutputStream.java
index d0e5d0ee3ff..aa672444581 100644
--- 
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/RetryableS3OutputStream.java
+++ 
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/RetryableS3OutputStream.java
@@ -32,6 +32,7 @@ import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.java.util.common.RetryUtils;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.storage.s3.S3Utils;
 import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
 
@@ -69,6 +70,11 @@ import java.util.concurrent.TimeUnit;
  */
 public class RetryableS3OutputStream extends OutputStream
 {
+  // Metric related constants.
+  private static final String METRIC_PREFIX = "s3/upload/total/";
+  private static final String METRIC_TOTAL_UPLOAD_TIME = METRIC_PREFIX + 
"time";
+  private static final String METRIC_TOTAL_UPLOAD_BYTES = METRIC_PREFIX + 
"bytes";
+
   private static final Logger LOG = new Logger(RetryableS3OutputStream.class);
 
   private final S3OutputConfig config;
@@ -208,14 +214,20 @@ public class RetryableS3OutputStream extends OutputStream
       org.apache.commons.io.FileUtils.forceDelete(chunkStorePath);
       LOG.info("Deleted chunkStorePath[%s]", chunkStorePath);
 
-      // This should be emitted as a metric
-      long totalChunkSize = (currentChunk.id - 1) * chunkSize + 
currentChunk.length();
+      final long totalBytesUploaded = (currentChunk.id - 1) * chunkSize + 
currentChunk.length();
+      final long totalUploadTimeMillis = 
pushStopwatch.elapsed(TimeUnit.MILLISECONDS);
       LOG.info(
-          "Pushed total [%d] parts containing [%d] bytes in [%d]ms.",
+          "Pushed total [%d] parts containing [%d] bytes in [%d]ms for 
s3Key[%s], uploadId[%s].",
           futures.size(),
-          totalChunkSize,
-          pushStopwatch.elapsed(TimeUnit.MILLISECONDS)
+          totalBytesUploaded,
+          totalUploadTimeMillis,
+          s3Key,
+          uploadId
       );
+
+      final ServiceMetricEvent.Builder builder = new 
ServiceMetricEvent.Builder().setDimension("uploadId", uploadId);
+      uploadManager.emitMetric(builder.setMetric(METRIC_TOTAL_UPLOAD_TIME, 
totalUploadTimeMillis));
+      uploadManager.emitMetric(builder.setMetric(METRIC_TOTAL_UPLOAD_BYTES, 
totalBytesUploaded));
     });
 
     try (Closer ignored = closer) {
diff --git 
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3UploadManager.java
 
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3UploadManager.java
index 9caa2bcb2e3..cc9ce4bf15a 100644
--- 
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3UploadManager.java
+++ 
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3UploadManager.java
@@ -25,10 +25,13 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.inject.Inject;
 import org.apache.druid.guice.ManageLifecycle;
 import org.apache.druid.java.util.common.RetryUtils;
+import org.apache.druid.java.util.common.Stopwatch;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
 import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.storage.s3.S3Utils;
 import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
 import org.apache.druid.utils.RuntimeInfo;
@@ -36,6 +39,7 @@ import org.apache.druid.utils.RuntimeInfo;
 import java.io.File;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * This class manages uploading files to S3 in chunks, while ensuring that the
@@ -44,18 +48,34 @@ import java.util.concurrent.Future;
 @ManageLifecycle
 public class S3UploadManager
 {
+  // Metric related constants.
+  private static final String METRIC_PREFIX = "s3/upload/part/";
+  private static final String METRIC_PART_QUEUED_TIME = METRIC_PREFIX + 
"queuedTime";
+  private static final String METRIC_QUEUE_SIZE = METRIC_PREFIX + "queueSize";
+  private static final String METRIC_PART_UPLOAD_TIME = METRIC_PREFIX + "time";
+
   private final ExecutorService uploadExecutor;
+  private final ServiceEmitter emitter;
 
   private static final Logger log = new Logger(S3UploadManager.class);
 
+  // For metrics regarding uploadExecutor.
+  private final AtomicInteger executorQueueSize = new AtomicInteger(0);
+
   @Inject
-  public S3UploadManager(S3OutputConfig s3OutputConfig, S3ExportConfig 
s3ExportConfig, RuntimeInfo runtimeInfo)
+  public S3UploadManager(
+      S3OutputConfig s3OutputConfig,
+      S3ExportConfig s3ExportConfig,
+      RuntimeInfo runtimeInfo,
+      ServiceEmitter emitter
+  )
   {
     int poolSize = Math.max(4, runtimeInfo.getAvailableProcessors());
     int maxNumChunksOnDisk = computeMaxNumChunksOnDisk(s3OutputConfig, 
s3ExportConfig);
     this.uploadExecutor = createExecutorService(poolSize, maxNumChunksOnDisk);
     log.info("Initialized executor service for S3 multipart upload with pool 
size [%d] and work queue capacity [%d]",
              poolSize, maxNumChunksOnDisk);
+    this.emitter = emitter;
   }
 
   /**
@@ -87,25 +107,36 @@ public class S3UploadManager
       S3OutputConfig config
   )
   {
-    return uploadExecutor.submit(() -> RetryUtils.retry(
-        () -> {
-          log.debug("Uploading chunk[%d] for uploadId[%s].", chunkNumber, 
uploadId);
-          UploadPartResult uploadPartResult = uploadPartIfPossible(
-              s3Client,
-              uploadId,
-              config.getBucket(),
-              key,
-              chunkNumber,
-              chunkFile
-          );
-          if (!chunkFile.delete()) {
-            log.warn("Failed to delete chunk [%s]", 
chunkFile.getAbsolutePath());
-          }
-          return uploadPartResult;
-        },
-        S3Utils.S3RETRY,
-        config.getMaxRetry()
-    ));
+    final Stopwatch stopwatch = Stopwatch.createStarted();
+    executorQueueSize.incrementAndGet();
+    return uploadExecutor.submit(() -> {
+      final ServiceMetricEvent.Builder metricBuilder = new 
ServiceMetricEvent.Builder();
+      emitMetric(metricBuilder.setMetric(METRIC_QUEUE_SIZE, 
executorQueueSize.decrementAndGet()));
+      metricBuilder.setDimension("uploadId", 
uploadId).setDimension("partNumber", chunkNumber);
+      emitMetric(metricBuilder.setMetric(METRIC_PART_QUEUED_TIME, 
stopwatch.millisElapsed()));
+      stopwatch.restart();
+
+      return RetryUtils.retry(
+          () -> {
+            log.debug("Uploading chunk[%d] for uploadId[%s].", chunkNumber, 
uploadId);
+            UploadPartResult uploadPartResult = uploadPartIfPossible(
+                s3Client,
+                uploadId,
+                config.getBucket(),
+                key,
+                chunkNumber,
+                chunkFile
+            );
+            if (!chunkFile.delete()) {
+              log.warn("Failed to delete chunk [%s]", 
chunkFile.getAbsolutePath());
+            }
+            emitMetric(metricBuilder.setMetric(METRIC_PART_UPLOAD_TIME, 
stopwatch.millisElapsed()));
+            return uploadPartResult;
+          },
+          S3Utils.S3RETRY,
+          config.getMaxRetry()
+      );
+    });
   }
 
   @VisibleForTesting
@@ -149,4 +180,8 @@ public class S3UploadManager
     uploadExecutor.shutdown();
   }
 
+  protected void emitMetric(ServiceMetricEvent.Builder builder)
+  {
+    emitter.emit(builder);
+  }
 }
diff --git 
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3StorageConnectorProviderTest.java
 
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3StorageConnectorProviderTest.java
index 676352daf4f..a880d6f2efa 100644
--- 
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3StorageConnectorProviderTest.java
+++ 
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3StorageConnectorProviderTest.java
@@ -32,6 +32,7 @@ import org.apache.druid.guice.JsonConfigProvider;
 import org.apache.druid.guice.LazySingleton;
 import org.apache.druid.guice.StartupInjectorBuilder;
 import org.apache.druid.java.util.common.HumanReadableBytes;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
 import org.apache.druid.query.DruidProcessingConfigTest;
 import org.apache.druid.storage.StorageConnector;
 import org.apache.druid.storage.StorageConnectorModule;
@@ -158,7 +159,8 @@ public class S3StorageConnectorProviderTest
                 new S3UploadManager(
                     new S3OutputConfig("bucket", "prefix", 
EasyMock.mock(File.class), new HumanReadableBytes("5MiB"), 1),
                     new S3ExportConfig("tempDir", new 
HumanReadableBytes("5MiB"), 1, null),
-                    new DruidProcessingConfigTest.MockRuntimeInfo(10, 0, 0))
+                    new DruidProcessingConfigTest.MockRuntimeInfo(10, 0, 0),
+                    new StubServiceEmitter())
             )
     );
 
diff --git 
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/RetryableS3OutputStreamTest.java
 
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/RetryableS3OutputStreamTest.java
index 8e7a81eb48d..8d15624c0d0 100644
--- 
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/RetryableS3OutputStreamTest.java
+++ 
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/RetryableS3OutputStreamTest.java
@@ -33,6 +33,7 @@ import com.amazonaws.services.s3.model.UploadPartResult;
 import org.apache.druid.java.util.common.HumanReadableBytes;
 import org.apache.druid.java.util.common.IOE;
 import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
 import org.apache.druid.query.DruidProcessingConfigTest;
 import org.apache.druid.storage.s3.NoopServerSideEncryption;
 import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
@@ -105,7 +106,8 @@ public class RetryableS3OutputStreamTest
     s3UploadManager = new S3UploadManager(
         new S3OutputConfig("bucket", "prefix", EasyMock.mock(File.class), new 
HumanReadableBytes("5MiB"), 1),
         new S3ExportConfig("tempDir", new HumanReadableBytes("5MiB"), 1, null),
-        new DruidProcessingConfigTest.MockRuntimeInfo(10, 0, 0));
+        new DruidProcessingConfigTest.MockRuntimeInfo(10, 0, 0),
+        new StubServiceEmitter());
   }
 
   @Test
diff --git 
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3StorageConnectorTest.java
 
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3StorageConnectorTest.java
index 67dcb3b6db6..68eaca1c42a 100644
--- 
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3StorageConnectorTest.java
+++ 
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3StorageConnectorTest.java
@@ -32,6 +32,7 @@ import com.amazonaws.services.s3.model.S3ObjectSummary;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import org.apache.druid.java.util.common.HumanReadableBytes;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
 import org.apache.druid.query.DruidProcessingConfigTest;
 import org.apache.druid.storage.StorageConnector;
 import org.apache.druid.storage.s3.NoopServerSideEncryption;
@@ -90,7 +91,8 @@ public class S3StorageConnectorTest
       storageConnector = new S3StorageConnector(s3OutputConfig, service, new 
S3UploadManager(
           s3OutputConfig,
           new S3ExportConfig("tempDir", new HumanReadableBytes("5MiB"), 1, 
null),
-          new DruidProcessingConfigTest.MockRuntimeInfo(10, 0, 0)));
+          new DruidProcessingConfigTest.MockRuntimeInfo(10, 0, 0),
+          new StubServiceEmitter()));
     }
     catch (IOException e) {
       throw new RuntimeException(e);
diff --git 
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3UploadManagerTest.java
 
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3UploadManagerTest.java
index b79c392844d..75305a3c95a 100644
--- 
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3UploadManagerTest.java
+++ 
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3UploadManagerTest.java
@@ -22,6 +22,7 @@ package org.apache.druid.storage.s3.output;
 import com.amazonaws.services.s3.model.UploadPartRequest;
 import com.amazonaws.services.s3.model.UploadPartResult;
 import org.apache.druid.java.util.common.HumanReadableBytes;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
 import org.apache.druid.query.DruidProcessingConfigTest;
 import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
 import org.apache.druid.utils.RuntimeInfo;
@@ -43,14 +44,16 @@ public class S3UploadManagerTest
   private S3UploadManager s3UploadManager;
   private S3OutputConfig s3OutputConfig;
   private S3ExportConfig s3ExportConfig;
+  private StubServiceEmitter serviceEmitter;
 
   @Before
   public void setUp()
   {
     s3OutputConfig = new S3OutputConfig("bucket", "prefix", 
EasyMock.mock(File.class), new HumanReadableBytes("100MiB"), 1);
     s3ExportConfig = new S3ExportConfig("tempDir", new 
HumanReadableBytes("200MiB"), 1, null);
+    serviceEmitter = new StubServiceEmitter();
     final RuntimeInfo runtimeInfo = new 
DruidProcessingConfigTest.MockRuntimeInfo(8, 0, 0);
-    s3UploadManager = new S3UploadManager(s3OutputConfig, s3ExportConfig, 
runtimeInfo);
+    s3UploadManager = new S3UploadManager(s3OutputConfig, s3ExportConfig, 
runtimeInfo, serviceEmitter);
   }
 
   @Test
@@ -75,6 +78,10 @@ public class S3UploadManagerTest
     UploadPartResult futureResult = result.get();
     Assert.assertEquals(chunkId, futureResult.getPartNumber());
     Assert.assertEquals("etag", futureResult.getETag());
+
+    serviceEmitter.verifyEmitted("s3/upload/part/queuedTime", 1);
+    serviceEmitter.verifyEmitted("s3/upload/part/queueSize", 1);
+    serviceEmitter.verifyEmitted("s3/upload/part/time", 1);
   }
 
   @Test
diff --git 
a/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
 
b/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
index 2ddba7c6cd8..e4a8b9403dd 100644
--- 
a/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
+++ 
b/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
@@ -26,9 +26,9 @@ import 
org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * Test implementation of {@link ServiceEmitter} that collects emitted metrics
@@ -38,7 +38,7 @@ public class StubServiceEmitter extends ServiceEmitter 
implements MetricsVerifie
 {
   private final List<Event> events = new ArrayList<>();
   private final List<AlertEvent> alertEvents = new ArrayList<>();
-  private final Map<String, List<ServiceMetricEvent>> metricEvents = new 
HashMap<>();
+  private final ConcurrentHashMap<String, List<ServiceMetricEvent>> 
metricEvents = new ConcurrentHashMap<>();
 
   public StubServiceEmitter()
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to