dmvk commented on a change in pull request #18536:
URL: https://github.com/apache/flink/pull/18536#discussion_r798483128
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
##########
@@ -873,69 +878,96 @@ public boolean deletePermanent(JobID jobId,
PermanentBlobKey key) {
* doesn't touch the job's entry in the {@link BlobStore} to enable
recovering.
*
* @param jobId The {@code JobID} of the job that is subject to cleanup.
- * @throws IOException if the cleanup failed.
*/
@Override
- public void localCleanup(JobID jobId) throws IOException {
+ public CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor
cleanupExecutor) {
checkNotNull(jobId);
+ return CompletableFuture.runAsync(
+ () -> {
+ readWriteLock.writeLock().lock();
+ try {
+ internalLocalCleanup(jobId);
+ } catch (IOException e) {
+ throw new CompletionException(e);
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
+ },
+ cleanupExecutor);
+ }
+
+ @GuardedBy("readWriteLock")
+ private void internalLocalCleanup(JobID jobId) throws IOException {
final File jobDir =
new File(
BlobUtils.getStorageLocationPath(
storageDir.deref().getAbsolutePath(), jobId));
-
FileUtils.deleteDirectory(jobDir);
- // NOTE on why blobExpiryTimes are not cleaned up:
- // Instead of going through blobExpiryTimes, keep lingering
entries - they
- // will be cleaned up by the timer task which tolerates
non-existing files
- // If inserted again with the same IDs (via put()), the TTL will
be updated
- // again.
+ // NOTE on why blobExpiryTimes are not cleaned up: Instead of going
through
+ // blobExpiryTimes, keep lingering entries. They will be cleaned up by
the timer
+ // task which tolerate non-existing files. If inserted again with the
same IDs
+ // (via put()), the TTL will be updated again.
}
/**
* Removes all BLOBs from local and HA store belonging to the given {@link
JobID}.
*
* @param jobId ID of the job this blob belongs to
- * @throws Exception if the cleanup fails.
*/
@Override
- public void globalCleanup(JobID jobId) throws Exception {
+ public CompletableFuture<Void> globalCleanupAsync(JobID jobId, Executor
executor) {
checkNotNull(jobId);
- readWriteLock.writeLock().lock();
+ return CompletableFuture.runAsync(
+ () -> {
+ readWriteLock.writeLock().lock();
- try {
- Exception exception = null;
+ try {
+ IOException exception = null;
- try {
- localCleanup(jobId);
- } catch (IOException e) {
- exception = e;
- }
+ try {
+ internalLocalCleanup(jobId);
+ } catch (IOException e) {
+ exception = e;
+ }
- if (!blobStore.deleteAll(jobId)) {
- exception =
- ExceptionUtils.firstOrSuppressed(
- new FlinkException(
- "Error while cleaning up the BlobStore
for job " + jobId),
- exception);
- }
+ if (!blobStore.deleteAll(jobId)) {
+ exception =
+ ExceptionUtils.firstOrSuppressed(
+ new IOException(
+ "Error while cleaning up
the BlobStore for job "
+ + jobId),
+ exception);
+ }
- ExceptionUtils.tryRethrowException(exception);
- } finally {
- readWriteLock.writeLock().unlock();
- }
+ if (exception != null) {
+ throw new CompletionException(exception);
+ }
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
+ },
+ executor);
}
- public void retainJobs(Collection<JobID> jobsToRetain) throws Exception {
+ public void retainJobs(Collection<JobID> jobsToRetain, Executor
ioExecutor) throws IOException {
if (storageDir.deref().exists()) {
final Set<JobID> jobsToRemove =
BlobUtils.listExistingJobs(storageDir.deref().toPath());
jobsToRemove.removeAll(jobsToRetain);
+ final Collection<CompletableFuture<Void>> cleanupResultFutures =
+ new ArrayList<>(jobsToRemove.size());
for (JobID jobToRemove : jobsToRemove) {
- globalCleanup(jobToRemove);
+ cleanupResultFutures.add(globalCleanupAsync(jobToRemove,
ioExecutor));
+ }
+
+ try {
+ FutureUtils.completeAll(cleanupResultFutures).get();
+ } catch (InterruptedException | ExecutionException e) {
+ ExceptionUtils.rethrowIOException(e);
Review comment:
hmm, I've missed the last branch where we wrap the exception in
IOException anyway :/ Non of the other branches will match anyway, so this
basically has the same semantics as:
```suggestion
throw new IOException(e);
```
which is more explicit 🤔
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]