Akshat-Jain commented on code in PR #16481:
URL: https://github.com/apache/druid/pull/16481#discussion_r1618816610
##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/RetryableS3OutputStream.java:
##########
@@ -199,15 +221,44 @@ private void pushCurrentChunk() throws IOException
{
currentChunk.close();
final Chunk chunk = currentChunk;
- try {
- if (chunk.length() > 0) {
- resultsSize += chunk.length();
+ if (chunk.length() > 0) {
+ uploadManager.incrementCurrentNumChunks();
+ pendingFiles.incrementAndGet();
- pushStopwatch.start();
- pushResults.add(push(chunk));
- pushStopwatch.stop();
- numChunksPushed++;
+ uploadManager.submitTask(() -> {
+ try {
+ uploadChunk(chunk);
+ }
+ catch (Exception e) {
+ error = true;
+ LOG.error(e, e.getMessage());
+ throw new RuntimeException(e);
+ }
+ finally {
+ synchronized (maxChunksLock) {
+ uploadManager.decrementCurrentNumChunks();
+ maxChunksLock.notifyAll();
+ }
+ if (pendingFiles.decrementAndGet() == 0) {
+ synchronized (fileLock) {
+ fileLock.notifyAll();
+ }
+ }
+ }
+ });
+ }
+ }
Review Comment:
@kfaraz I tried sketching out how it could look, and I don't think it'll be
clean to move the upload functionality to outside of RetryableS3OutputStream
file.
RetryableS3OutputStream is a per-`uploadId` class. Most/All of its fields
are per upload ID info.
To move the upload functionality to a separate Singleton class like
`S3UploadManager`, we would need a `ConcurrentHashMap: uploadId -> xyz` in the
Singleton for every field like `pendingFiles`, `fileLock`, `resultsSize`,
`numChunksPushed`, `pushStopwatch`, `pushResults`, `error`, `closed`, etc -
which seems like an overkill to me.
We'd also have to pass `S3OutputConfig`, `s3Key` and the s3 client to the
Singleton class, that is, to S3UploadManager.
Thoughts? Maybe we can rename `S3UploadManager` to something else indicating
that it doesn't do the actual upload? The original intention was to keep and
manage only the global (that is, non-uploadId-specific) components in the
Singleton class, as RetryableS3OutputStream would've needed static fields to
achieve it otherwise.
I'll wait for your thoughts before tinkering further on this. Thanks!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]