This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new cd438b1918c Emit metrics for S3UploadThreadPool (#16616)
cd438b1918c is described below
commit cd438b1918c1bf925e76ba97b6ce076cde831df4
Author: Akshat Jain <[email protected]>
AuthorDate: Fri Jun 21 11:36:47 2024 +0530
Emit metrics for S3UploadThreadPool (#16616)
* Emit metrics for S3UploadThreadPool
* Address review comments
* Revert unnecessary formatting change
* Revert unnecessary formatting change in metrics.md file
* Address review comments
* Add metric for task duration
* Minor fix in metrics.md
* Add s3Key and uploadId in the log message
* Address review comments
* Create new instance of ServiceMetricEvent.Builder for thread safety
* Address review comments
* Address review comments
---
docs/operations/metrics.md | 13 ++++
.../storage/s3/output/RetryableS3OutputStream.java | 22 +++++--
.../druid/storage/s3/output/S3UploadManager.java | 75 ++++++++++++++++------
.../storage/s3/S3StorageConnectorProviderTest.java | 4 +-
.../s3/output/RetryableS3OutputStreamTest.java | 4 +-
.../storage/s3/output/S3StorageConnectorTest.java | 4 +-
.../storage/s3/output/S3UploadManagerTest.java | 9 ++-
.../java/util/metrics/StubServiceEmitter.java | 4 +-
8 files changed, 104 insertions(+), 31 deletions(-)
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index bf241ac5708..1d37169684e 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -508,6 +508,19 @@ These metrics are only available if the `OshiSysMonitor`
module is included.
|`sys/tcpv4/out/rsts`|Total "out reset" packets sent to reset the
connection||Generally 0|
|`sys/tcpv4/retrans/segs`|Total segments re-transmitted||Varies|
+
+## S3 multi-part upload
+
+These metrics are only available if the `druid-s3-extensions` module is
included and if certain specific features are being used: MSQ export to S3,
durable intermediate storage on S3.
+
+|Metric|Description|Dimensions|Normal value|
+|------|-----------|----------|------------|
+|`s3/upload/part/queueSize`|Number of items currently waiting in queue to be
uploaded to S3. Each item in the queue corresponds to a single part in a
multi-part upload.||Varies|
+|`s3/upload/part/queuedTime`|Milliseconds spent by a single item (or part) in
queue before it starts getting uploaded to S3.|`uploadId`, `partNumber`|Varies|
+|`s3/upload/part/time`|Milliseconds taken to upload a single part of a
multi-part upload to S3.|`uploadId`, `partNumber`|Varies|
+|`s3/upload/total/time`|Milliseconds taken for uploading all parts of a
multi-part upload to S3.|`uploadId`|Varies|
+|`s3/upload/total/bytes`|Total bytes uploaded to S3 during a multi-part
upload.|`uploadId`|Varies|
+
## Cgroup
These metrics are available on operating systems with the cgroup kernel
feature. All the values are derived by reading from `/sys/fs/cgroup`.
diff --git
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/RetryableS3OutputStream.java
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/RetryableS3OutputStream.java
index d0e5d0ee3ff..aa672444581 100644
---
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/RetryableS3OutputStream.java
+++
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/RetryableS3OutputStream.java
@@ -32,6 +32,7 @@ import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.storage.s3.S3Utils;
import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
@@ -69,6 +70,11 @@ import java.util.concurrent.TimeUnit;
*/
public class RetryableS3OutputStream extends OutputStream
{
+ // Metric related constants.
+ private static final String METRIC_PREFIX = "s3/upload/total/";
+ private static final String METRIC_TOTAL_UPLOAD_TIME = METRIC_PREFIX +
"time";
+ private static final String METRIC_TOTAL_UPLOAD_BYTES = METRIC_PREFIX +
"bytes";
+
private static final Logger LOG = new Logger(RetryableS3OutputStream.class);
private final S3OutputConfig config;
@@ -208,14 +214,20 @@ public class RetryableS3OutputStream extends OutputStream
org.apache.commons.io.FileUtils.forceDelete(chunkStorePath);
LOG.info("Deleted chunkStorePath[%s]", chunkStorePath);
- // This should be emitted as a metric
- long totalChunkSize = (currentChunk.id - 1) * chunkSize +
currentChunk.length();
+ final long totalBytesUploaded = (currentChunk.id - 1) * chunkSize +
currentChunk.length();
+ final long totalUploadTimeMillis =
pushStopwatch.elapsed(TimeUnit.MILLISECONDS);
LOG.info(
- "Pushed total [%d] parts containing [%d] bytes in [%d]ms.",
+ "Pushed total [%d] parts containing [%d] bytes in [%d]ms for
s3Key[%s], uploadId[%s].",
futures.size(),
- totalChunkSize,
- pushStopwatch.elapsed(TimeUnit.MILLISECONDS)
+ totalBytesUploaded,
+ totalUploadTimeMillis,
+ s3Key,
+ uploadId
);
+
+ final ServiceMetricEvent.Builder builder = new
ServiceMetricEvent.Builder().setDimension("uploadId", uploadId);
+ uploadManager.emitMetric(builder.setMetric(METRIC_TOTAL_UPLOAD_TIME,
totalUploadTimeMillis));
+ uploadManager.emitMetric(builder.setMetric(METRIC_TOTAL_UPLOAD_BYTES,
totalBytesUploaded));
});
try (Closer ignored = closer) {
diff --git
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3UploadManager.java
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3UploadManager.java
index 9caa2bcb2e3..cc9ce4bf15a 100644
---
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3UploadManager.java
+++
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3UploadManager.java
@@ -25,10 +25,13 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Inject;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.java.util.common.RetryUtils;
+import org.apache.druid.java.util.common.Stopwatch;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.storage.s3.S3Utils;
import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
import org.apache.druid.utils.RuntimeInfo;
@@ -36,6 +39,7 @@ import org.apache.druid.utils.RuntimeInfo;
import java.io.File;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* This class manages uploading files to S3 in chunks, while ensuring that the
@@ -44,18 +48,34 @@ import java.util.concurrent.Future;
@ManageLifecycle
public class S3UploadManager
{
+ // Metric related constants.
+ private static final String METRIC_PREFIX = "s3/upload/part/";
+ private static final String METRIC_PART_QUEUED_TIME = METRIC_PREFIX +
"queuedTime";
+ private static final String METRIC_QUEUE_SIZE = METRIC_PREFIX + "queueSize";
+ private static final String METRIC_PART_UPLOAD_TIME = METRIC_PREFIX + "time";
+
private final ExecutorService uploadExecutor;
+ private final ServiceEmitter emitter;
private static final Logger log = new Logger(S3UploadManager.class);
+ // For metrics regarding uploadExecutor.
+ private final AtomicInteger executorQueueSize = new AtomicInteger(0);
+
@Inject
- public S3UploadManager(S3OutputConfig s3OutputConfig, S3ExportConfig
s3ExportConfig, RuntimeInfo runtimeInfo)
+ public S3UploadManager(
+ S3OutputConfig s3OutputConfig,
+ S3ExportConfig s3ExportConfig,
+ RuntimeInfo runtimeInfo,
+ ServiceEmitter emitter
+ )
{
int poolSize = Math.max(4, runtimeInfo.getAvailableProcessors());
int maxNumChunksOnDisk = computeMaxNumChunksOnDisk(s3OutputConfig,
s3ExportConfig);
this.uploadExecutor = createExecutorService(poolSize, maxNumChunksOnDisk);
log.info("Initialized executor service for S3 multipart upload with pool
size [%d] and work queue capacity [%d]",
poolSize, maxNumChunksOnDisk);
+ this.emitter = emitter;
}
/**
@@ -87,25 +107,36 @@ public class S3UploadManager
S3OutputConfig config
)
{
- return uploadExecutor.submit(() -> RetryUtils.retry(
- () -> {
- log.debug("Uploading chunk[%d] for uploadId[%s].", chunkNumber,
uploadId);
- UploadPartResult uploadPartResult = uploadPartIfPossible(
- s3Client,
- uploadId,
- config.getBucket(),
- key,
- chunkNumber,
- chunkFile
- );
- if (!chunkFile.delete()) {
- log.warn("Failed to delete chunk [%s]",
chunkFile.getAbsolutePath());
- }
- return uploadPartResult;
- },
- S3Utils.S3RETRY,
- config.getMaxRetry()
- ));
+ final Stopwatch stopwatch = Stopwatch.createStarted();
+ executorQueueSize.incrementAndGet();
+ return uploadExecutor.submit(() -> {
+ final ServiceMetricEvent.Builder metricBuilder = new
ServiceMetricEvent.Builder();
+ emitMetric(metricBuilder.setMetric(METRIC_QUEUE_SIZE,
executorQueueSize.decrementAndGet()));
+ metricBuilder.setDimension("uploadId",
uploadId).setDimension("partNumber", chunkNumber);
+ emitMetric(metricBuilder.setMetric(METRIC_PART_QUEUED_TIME,
stopwatch.millisElapsed()));
+ stopwatch.restart();
+
+ return RetryUtils.retry(
+ () -> {
+ log.debug("Uploading chunk[%d] for uploadId[%s].", chunkNumber,
uploadId);
+ UploadPartResult uploadPartResult = uploadPartIfPossible(
+ s3Client,
+ uploadId,
+ config.getBucket(),
+ key,
+ chunkNumber,
+ chunkFile
+ );
+ if (!chunkFile.delete()) {
+ log.warn("Failed to delete chunk [%s]",
chunkFile.getAbsolutePath());
+ }
+ emitMetric(metricBuilder.setMetric(METRIC_PART_UPLOAD_TIME,
stopwatch.millisElapsed()));
+ return uploadPartResult;
+ },
+ S3Utils.S3RETRY,
+ config.getMaxRetry()
+ );
+ });
}
@VisibleForTesting
@@ -149,4 +180,8 @@ public class S3UploadManager
uploadExecutor.shutdown();
}
+ protected void emitMetric(ServiceMetricEvent.Builder builder)
+ {
+ emitter.emit(builder);
+ }
}
diff --git
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3StorageConnectorProviderTest.java
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3StorageConnectorProviderTest.java
index 676352daf4f..a880d6f2efa 100644
---
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3StorageConnectorProviderTest.java
+++
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3StorageConnectorProviderTest.java
@@ -32,6 +32,7 @@ import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.StartupInjectorBuilder;
import org.apache.druid.java.util.common.HumanReadableBytes;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.query.DruidProcessingConfigTest;
import org.apache.druid.storage.StorageConnector;
import org.apache.druid.storage.StorageConnectorModule;
@@ -158,7 +159,8 @@ public class S3StorageConnectorProviderTest
new S3UploadManager(
new S3OutputConfig("bucket", "prefix",
EasyMock.mock(File.class), new HumanReadableBytes("5MiB"), 1),
new S3ExportConfig("tempDir", new
HumanReadableBytes("5MiB"), 1, null),
- new DruidProcessingConfigTest.MockRuntimeInfo(10, 0, 0))
+ new DruidProcessingConfigTest.MockRuntimeInfo(10, 0, 0),
+ new StubServiceEmitter())
)
);
diff --git
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/RetryableS3OutputStreamTest.java
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/RetryableS3OutputStreamTest.java
index 8e7a81eb48d..8d15624c0d0 100644
---
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/RetryableS3OutputStreamTest.java
+++
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/RetryableS3OutputStreamTest.java
@@ -33,6 +33,7 @@ import com.amazonaws.services.s3.model.UploadPartResult;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.query.DruidProcessingConfigTest;
import org.apache.druid.storage.s3.NoopServerSideEncryption;
import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
@@ -105,7 +106,8 @@ public class RetryableS3OutputStreamTest
s3UploadManager = new S3UploadManager(
new S3OutputConfig("bucket", "prefix", EasyMock.mock(File.class), new
HumanReadableBytes("5MiB"), 1),
new S3ExportConfig("tempDir", new HumanReadableBytes("5MiB"), 1, null),
- new DruidProcessingConfigTest.MockRuntimeInfo(10, 0, 0));
+ new DruidProcessingConfigTest.MockRuntimeInfo(10, 0, 0),
+ new StubServiceEmitter());
}
@Test
diff --git
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3StorageConnectorTest.java
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3StorageConnectorTest.java
index 67dcb3b6db6..68eaca1c42a 100644
---
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3StorageConnectorTest.java
+++
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3StorageConnectorTest.java
@@ -32,6 +32,7 @@ import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.druid.java.util.common.HumanReadableBytes;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.query.DruidProcessingConfigTest;
import org.apache.druid.storage.StorageConnector;
import org.apache.druid.storage.s3.NoopServerSideEncryption;
@@ -90,7 +91,8 @@ public class S3StorageConnectorTest
storageConnector = new S3StorageConnector(s3OutputConfig, service, new
S3UploadManager(
s3OutputConfig,
new S3ExportConfig("tempDir", new HumanReadableBytes("5MiB"), 1,
null),
- new DruidProcessingConfigTest.MockRuntimeInfo(10, 0, 0)));
+ new DruidProcessingConfigTest.MockRuntimeInfo(10, 0, 0),
+ new StubServiceEmitter()));
}
catch (IOException e) {
throw new RuntimeException(e);
diff --git
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3UploadManagerTest.java
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3UploadManagerTest.java
index b79c392844d..75305a3c95a 100644
---
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3UploadManagerTest.java
+++
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3UploadManagerTest.java
@@ -22,6 +22,7 @@ package org.apache.druid.storage.s3.output;
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult;
import org.apache.druid.java.util.common.HumanReadableBytes;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.query.DruidProcessingConfigTest;
import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
import org.apache.druid.utils.RuntimeInfo;
@@ -43,14 +44,16 @@ public class S3UploadManagerTest
private S3UploadManager s3UploadManager;
private S3OutputConfig s3OutputConfig;
private S3ExportConfig s3ExportConfig;
+ private StubServiceEmitter serviceEmitter;
@Before
public void setUp()
{
s3OutputConfig = new S3OutputConfig("bucket", "prefix",
EasyMock.mock(File.class), new HumanReadableBytes("100MiB"), 1);
s3ExportConfig = new S3ExportConfig("tempDir", new
HumanReadableBytes("200MiB"), 1, null);
+ serviceEmitter = new StubServiceEmitter();
final RuntimeInfo runtimeInfo = new
DruidProcessingConfigTest.MockRuntimeInfo(8, 0, 0);
- s3UploadManager = new S3UploadManager(s3OutputConfig, s3ExportConfig,
runtimeInfo);
+ s3UploadManager = new S3UploadManager(s3OutputConfig, s3ExportConfig,
runtimeInfo, serviceEmitter);
}
@Test
@@ -75,6 +78,10 @@ public class S3UploadManagerTest
UploadPartResult futureResult = result.get();
Assert.assertEquals(chunkId, futureResult.getPartNumber());
Assert.assertEquals("etag", futureResult.getETag());
+
+ serviceEmitter.verifyEmitted("s3/upload/part/queuedTime", 1);
+ serviceEmitter.verifyEmitted("s3/upload/part/queueSize", 1);
+ serviceEmitter.verifyEmitted("s3/upload/part/time", 1);
}
@Test
diff --git
a/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
b/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
index 2ddba7c6cd8..e4a8b9403dd 100644
---
a/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
+++
b/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
@@ -26,9 +26,9 @@ import
org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
/**
* Test implementation of {@link ServiceEmitter} that collects emitted metrics
@@ -38,7 +38,7 @@ public class StubServiceEmitter extends ServiceEmitter
implements MetricsVerifie
{
private final List<Event> events = new ArrayList<>();
private final List<AlertEvent> alertEvents = new ArrayList<>();
- private final Map<String, List<ServiceMetricEvent>> metricEvents = new
HashMap<>();
+ private final ConcurrentHashMap<String, List<ServiceMetricEvent>>
metricEvents = new ConcurrentHashMap<>();
public StubServiceEmitter()
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]