This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new f33898ed6d Fix durable storage cleanup (#13853)
f33898ed6d is described below
commit f33898ed6d81099a9b73531093e36200bb77f188
Author: Rohan Garg <[email protected]>
AuthorDate: Mon Mar 6 09:49:14 2023 +0530
Fix durable storage cleanup (#13853)
---
.../java/org/apache/druid/msq/exec/WorkerImpl.java | 37 +++++++++++-----------
.../storage/s3/output/S3StorageConnector.java | 17 +++++++++-
2 files changed, 34 insertions(+), 20 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
index 8dfe30b057..fa29cd2c01 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
@@ -822,25 +822,24 @@ public class WorkerImpl implements Worker
continue;
}
output.close();
-
- // One caveat with this approach is that in case of a worker crash,
while the MM/Indexer systems will delete their
- // temp directories where intermediate results were stored, it won't be
the case for the external storage.
- // Therefore, the logic for cleaning the stage output in case of a
worker/machine crash has to be external.
- // We currently take care of this in the controller.
- if (durableStageStorageEnabled && removeDurableStorageFiles) {
- final String folderName =
DurableStorageUtils.getTaskIdOutputsFolderName(
- task.getControllerTaskId(),
- stageId.getStageNumber(),
- task.getWorkerNumber(),
- task.getId()
- );
- try {
-
MSQTasks.makeStorageConnector(context.injector()).deleteRecursively(folderName);
- }
- catch (Exception e) {
- // If an error is thrown while cleaning up a file, log it and try to
continue with the cleanup
- log.warn(e, "Error while cleaning up folder at path " + folderName);
- }
+ }
+ // One caveat with this approach is that in case of a worker crash, while
the MM/Indexer systems will delete their
+ // temp directories where intermediate results were stored, it won't be
the case for the external storage.
+ // Therefore, the logic for cleaning the stage output in case of a
worker/machine crash has to be external.
+ // We currently take care of this in the controller.
+ if (durableStageStorageEnabled && removeDurableStorageFiles) {
+ final String folderName = DurableStorageUtils.getTaskIdOutputsFolderName(
+ task.getControllerTaskId(),
+ stageId.getStageNumber(),
+ task.getWorkerNumber(),
+ task.getId()
+ );
+ try {
+
MSQTasks.makeStorageConnector(context.injector()).deleteRecursively(folderName);
+ }
+ catch (Exception e) {
+ // If an error is thrown while cleaning up a file, log it and try to
continue with the cleanup
+ log.warn(e, "Error while cleaning up folder at path " + folderName);
}
}
}
diff --git
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java
index ea583290af..66391ead4d 100644
---
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java
+++
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java
@@ -121,6 +121,7 @@ public class S3StorageConnector implements StorageConnector
} else {
readEnd = this.s3Client.getObjectMetadata(config.getBucket(),
objectPath(path)).getInstanceLength();
}
+ AtomicBoolean isSequenceStreamClosed = new AtomicBoolean(false);
// build a sequence input stream from chunks
return new SequenceInputStream(new Enumeration<InputStream>()
@@ -128,6 +129,12 @@ public class S3StorageConnector implements StorageConnector
@Override
public boolean hasMoreElements()
{
+ // checking if the stream was already closed. If it was, then don't
iterate over the remaining chunks
+ // SequenceInputStream's close method closes all the chunk streams in
its close. Since we're opening them
+ // lazily, we don't need to close them.
+ if (isSequenceStreamClosed.get()) {
+ return false;
+ }
// don't stop until the whole object is downloaded
return currReadStart.get() < readEnd;
}
@@ -212,7 +219,15 @@ public class S3StorageConnector implements StorageConnector
throw new RE(e, StringUtils.format("Unable to find temp file [%s]",
outFile));
}
}
- });
+ })
+ {
+ @Override
+ public void close() throws IOException
+ {
+ isSequenceStreamClosed.set(true);
+ super.close();
+ }
+ };
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]