kfaraz commented on code in PR #16481:
URL: https://github.com/apache/druid/pull/16481#discussion_r1609455161


##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/RetryableS3OutputStream.java:
##########
@@ -199,15 +211,47 @@ private void pushCurrentChunk() throws IOException
   {
     currentChunk.close();
     final Chunk chunk = currentChunk;
-    try {
-      if (chunk.length() > 0) {
-        resultsSize += chunk.length();
+    if (chunk.length() > 0) {
+      try {
+        SEMAPHORE.acquire(); // Acquire a permit from the semaphore

Review Comment:
   Between the static semaphore, the executor threads and other locks, the code 
has become very difficult to follow and clearly error prone.
   
   If we are trying to avoid running out of disk space, then instead of having 
semaphores, we just need to check some condition in `write()` that there is 
disk space available (this can be computed based on the formula that @cryptoe 
has suggested). If there isn't, we should wait for the condition to be 
satisfied before proceeding with the write. 
   
   There should be a javadoc in `write()` method which also calls this out 
clearly.
   The class level javadoc might need to be updated too based on the changes 
here.
   



##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/RetryableS3OutputStream.java:
##########
@@ -103,22 +107,32 @@ public class RetryableS3OutputStream extends OutputStream
   private boolean error;
   private boolean closed;
 
-  public RetryableS3OutputStream(
-      S3OutputConfig config,
-      ServerSideEncryptingAmazonS3 s3,
-      String s3Key
-  ) throws IOException
-  {
+  /**
+   * An atomic counter to store number of files pending to be uploaded for the 
particular uploadId.
+   */
+  private final AtomicInteger pendingFiles = new AtomicInteger(0);
 
-    this(config, s3, s3Key, true);
-  }
+  /**
+   * A lock used for notifying the main thread about the completion of 
s3.uploadPart() for all chunks
+   * and hence starting the s3.completeMultipartUpload() for the uploadId.
+   */
+  private final Object fileLock = new Object();
+
+  /**
+   * Semaphore to restrict the maximum number of simultaneous chunks on disk.
+   */
+  private static final int MAX_CONCURRENT_CHUNKS = 10;
+  private static final Semaphore SEMAPHORE = new 
Semaphore(MAX_CONCURRENT_CHUNKS);
 
-  @VisibleForTesting
-  protected RetryableS3OutputStream(
+  /**
+   * Threadpool used for uploading the chunks asynchronously.
+   */
+  private static final ExecutorService UPLOAD_EXECUTOR = 
Execs.multiThreaded(10, "UploadThreadPool-%d");

Review Comment:
   This executor should not be static. If the intention is to share the 
executor, it should be passed in as an argument to the constructor. Also, there 
should be a well defined lifecycle of the executor, i.e. when it is started and 
stopped.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to