kfaraz commented on code in PR #16481:
URL: https://github.com/apache/druid/pull/16481#discussion_r1616512750
##########
extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3StorageConnectorProviderTest.java:
##########
@@ -146,6 +148,10 @@ public void configure(Binder binder)
.addValue(
ServerSideEncryptingAmazonS3.class,
new ServerSideEncryptingAmazonS3(null, new
NoopServerSideEncryption())
+ )
+ .addValue(
+ ExecutorService.class,
+ Execs.multiThreaded(10, "UploadThreadPool-%d")
Review Comment:
Why do you need 10 threads in a test?
##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java:
##########
@@ -182,4 +186,13 @@ public Supplier<ServerSideEncryptingAmazonS3>
getAmazonS3ClientSupplier(
{
return Suppliers.memoize(serverSideEncryptingAmazonS3Builder::build);
}
+
+ @Provides
+ @LazySingleton
+ @Named("S3UploadThreadPool")
+ public ExecutorService getUploadPoolExecutorService(ScheduledExecutorFactory
scheduledExecutorFactory, RuntimeInfo runtimeInfo)
+ {
+ int poolSize = Math.max(4, 3 * runtimeInfo.getAvailableProcessors());
Review Comment:
`3 x numProcessors` seems too many as there are going to be other threads
too.
I think just `numProcessors` is high enough.
##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java:
##########
@@ -182,4 +186,13 @@ public Supplier<ServerSideEncryptingAmazonS3>
getAmazonS3ClientSupplier(
{
return Suppliers.memoize(serverSideEncryptingAmazonS3Builder::build);
}
+
+ @Provides
+ @LazySingleton
+ @Named("S3UploadThreadPool")
Review Comment:
This string should be a constant somewhere.
##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/RetryableS3OutputStream.java:
##########
@@ -199,15 +229,44 @@ private void pushCurrentChunk() throws IOException
{
currentChunk.close();
final Chunk chunk = currentChunk;
- try {
- if (chunk.length() > 0) {
- resultsSize += chunk.length();
+ if (chunk.length() > 0) {
+ s3UploadConfig.incrementCurrentNumChunks();
Review Comment:
+1, I think this information can be tracked in a singleton called
`S3UploadManager` or something. It is also a little weird to inject the
`ExecutorService` everywhere. That executor service can live inside
`S3UploadManager` so that we have a single place to manager the lifecycle of
the executor. Also, individual `RetryableS3OutputStream` would submit jobs to
the `S3UploadManager`.
It would be this manager that is injected everywhere instead of the raw
`ExecutorService`.
##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java:
##########
@@ -182,4 +186,13 @@ public Supplier<ServerSideEncryptingAmazonS3>
getAmazonS3ClientSupplier(
{
return Suppliers.memoize(serverSideEncryptingAmazonS3Builder::build);
}
+
+ @Provides
+ @LazySingleton
+ @Named("S3UploadThreadPool")
+ public ExecutorService getUploadPoolExecutorService(ScheduledExecutorFactory
scheduledExecutorFactory, RuntimeInfo runtimeInfo)
Review Comment:
```suggestion
public ExecutorService getUploadExecutorService(ScheduledExecutorFactory
scheduledExecutorFactory, RuntimeInfo runtimeInfo)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]