Akshat-Jain commented on code in PR #16481:
URL: https://github.com/apache/druid/pull/16481#discussion_r1615484400
##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/RetryableS3OutputStream.java:
##########
@@ -199,15 +211,47 @@ private void pushCurrentChunk() throws IOException
{
currentChunk.close();
final Chunk chunk = currentChunk;
- try {
- if (chunk.length() > 0) {
- resultsSize += chunk.length();
+ if (chunk.length() > 0) {
+ try {
+ SEMAPHORE.acquire(); // Acquire a permit from the semaphore
+ pendingFiles.incrementAndGet();
+
+ UPLOAD_EXECUTOR.submit(() -> {
+ try {
+ uploadChunk(chunk);
+ }
+ catch (Exception e) {
+ error = true;
+ LOG.error(e, e.getMessage());
+ throw new RuntimeException(e);
+ }
+ finally {
+ SEMAPHORE.release(); // Release the permit after upload is
completed
Review Comment:
@LakshSingla `acquire` needs to block the main thread (not the upload
threads), hence it's acquired by the main thread. And since we don't block the
main thread by waiting for results of the individual upload threads (except the
last part when we complete the multipart upload), the semaphore is released by
the individual upload threads.
Also, Kashif suggested to move away from semaphore in favor of a different
approach, and I'm working on making that change currently:
https://github.com/apache/druid/pull/16481#discussion_r1609581399
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]