kfaraz commented on code in PR #16481:
URL: https://github.com/apache/druid/pull/16481#discussion_r1609455161
##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/RetryableS3OutputStream.java:
##########
@@ -199,15 +211,47 @@ private void pushCurrentChunk() throws IOException
{
currentChunk.close();
final Chunk chunk = currentChunk;
- try {
- if (chunk.length() > 0) {
- resultsSize += chunk.length();
+ if (chunk.length() > 0) {
+ try {
+ SEMAPHORE.acquire(); // Acquire a permit from the semaphore
Review Comment:
Between the static semaphore, the executor threads and other locks, the code
has become very difficult to follow and clearly error prone.
If we are trying to avoid running out of disk space, then instead of having
semaphores, we just need to check some condition in `write()` that there is
disk space available (this can be computed based on the formula that @cryptoe
has suggested). If there isn't, we should wait for the condition to be
satisfied before proceeding with the write.
There should be a javadoc in `write()` method which also calls this out
clearly.
The class level javadoc might need to be updated too based on the changes
here.
##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/RetryableS3OutputStream.java:
##########
@@ -103,22 +107,32 @@ public class RetryableS3OutputStream extends OutputStream
private boolean error;
private boolean closed;
- public RetryableS3OutputStream(
- S3OutputConfig config,
- ServerSideEncryptingAmazonS3 s3,
- String s3Key
- ) throws IOException
- {
+ /**
+ * An atomic counter to store number of files pending to be uploaded for the
particular uploadId.
+ */
+ private final AtomicInteger pendingFiles = new AtomicInteger(0);
- this(config, s3, s3Key, true);
- }
+ /**
+ * A lock used for notifying the main thread about the completion of
s3.uploadPart() for all chunks
+ * and hence starting the s3.completeMultipartUpload() for the uploadId.
+ */
+ private final Object fileLock = new Object();
+
+ /**
+ * Semaphore to restrict the maximum number of simultaneous chunks on disk.
+ */
+ private static final int MAX_CONCURRENT_CHUNKS = 10;
+ private static final Semaphore SEMAPHORE = new
Semaphore(MAX_CONCURRENT_CHUNKS);
- @VisibleForTesting
- protected RetryableS3OutputStream(
+ /**
+ * Threadpool used for uploading the chunks asynchronously.
+ */
+ private static final ExecutorService UPLOAD_EXECUTOR =
Execs.multiThreaded(10, "UploadThreadPool-%d");
Review Comment:
This executor should not be static. If the intention is to share the
executor, it should be passed in as an argument to the constructor. Also, there
should be a well defined lifecycle of the executor, i.e. when it is started and
stopped.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]