This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new f33898ed6d Fix durable storage cleanup (#13853)
f33898ed6d is described below

commit f33898ed6d81099a9b73531093e36200bb77f188
Author: Rohan Garg <[email protected]>
AuthorDate: Mon Mar 6 09:49:14 2023 +0530

    Fix durable storage cleanup (#13853)
---
 .../java/org/apache/druid/msq/exec/WorkerImpl.java | 37 +++++++++++-----------
 .../storage/s3/output/S3StorageConnector.java      | 17 +++++++++-
 2 files changed, 34 insertions(+), 20 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
index 8dfe30b057..fa29cd2c01 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
@@ -822,25 +822,24 @@ public class WorkerImpl implements Worker
         continue;
       }
       output.close();
-
-      // One caveat with this approach is that in case of a worker crash, 
while the MM/Indexer systems will delete their
-      // temp directories where intermediate results were stored, it won't be 
the case for the external storage.
-      // Therefore, the logic for cleaning the stage output in case of a 
worker/machine crash has to be external.
-      // We currently take care of this in the controller.
-      if (durableStageStorageEnabled && removeDurableStorageFiles) {
-        final String folderName = 
DurableStorageUtils.getTaskIdOutputsFolderName(
-            task.getControllerTaskId(),
-            stageId.getStageNumber(),
-            task.getWorkerNumber(),
-            task.getId()
-        );
-        try {
-          
MSQTasks.makeStorageConnector(context.injector()).deleteRecursively(folderName);
-        }
-        catch (Exception e) {
-          // If an error is thrown while cleaning up a file, log it and try to 
continue with the cleanup
-          log.warn(e, "Error while cleaning up folder at path " + folderName);
-        }
+    }
+    // One caveat with this approach is that in case of a worker crash, while 
the MM/Indexer systems will delete their
+    // temp directories where intermediate results were stored, it won't be 
the case for the external storage.
+    // Therefore, the logic for cleaning the stage output in case of a 
worker/machine crash has to be external.
+    // We currently take care of this in the controller.
+    if (durableStageStorageEnabled && removeDurableStorageFiles) {
+      final String folderName = DurableStorageUtils.getTaskIdOutputsFolderName(
+          task.getControllerTaskId(),
+          stageId.getStageNumber(),
+          task.getWorkerNumber(),
+          task.getId()
+      );
+      try {
+        
MSQTasks.makeStorageConnector(context.injector()).deleteRecursively(folderName);
+      }
+      catch (Exception e) {
+        // If an error is thrown while cleaning up a file, log it and try to 
continue with the cleanup
+        log.warn(e, "Error while cleaning up folder at path " + folderName);
       }
     }
   }
diff --git 
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java
 
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java
index ea583290af..66391ead4d 100644
--- 
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java
+++ 
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java
@@ -121,6 +121,7 @@ public class S3StorageConnector implements StorageConnector
     } else {
       readEnd = this.s3Client.getObjectMetadata(config.getBucket(), 
objectPath(path)).getInstanceLength();
     }
+    AtomicBoolean isSequenceStreamClosed = new AtomicBoolean(false);
 
     // build a sequence input stream from chunks
     return new SequenceInputStream(new Enumeration<InputStream>()
@@ -128,6 +129,12 @@ public class S3StorageConnector implements StorageConnector
       @Override
       public boolean hasMoreElements()
       {
+        // checking if the stream was already closed. If it was, then don't 
iterate over the remaining chunks
+        // SequenceInputStream's close method closes all the chunk streams in 
its close. Since we're opening them
+        // lazily, we don't need to close them.
+        if (isSequenceStreamClosed.get()) {
+          return false;
+        }
         // don't stop until the whole object is downloaded
         return currReadStart.get() < readEnd;
       }
@@ -212,7 +219,15 @@ public class S3StorageConnector implements StorageConnector
           throw new RE(e, StringUtils.format("Unable to find temp file [%s]", 
outFile));
         }
       }
-    });
+    })
+    {
+      @Override
+      public void close() throws IOException
+      {
+        isSequenceStreamClosed.set(true);
+        super.close();
+      }
+    };
   }
 
   @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to