XComp commented on a change in pull request #18536:
URL: https://github.com/apache/flink/pull/18536#discussion_r796542238
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
##########
@@ -861,61 +869,73 @@ public boolean deletePermanent(JobID jobId,
PermanentBlobKey key) {
}
/**
- * Removes all BLOBs from local and HA store belonging to the given job ID.
+ * Deletes locally stored artifacts for the job represented by the given
{@link JobID}. This
+ * doesn't touch the job's entry in the {@link BlobStore} to enable
recovering.
*
- * @param jobId ID of the job this blob belongs to
- * @param cleanupBlobStoreFiles True if the corresponding blob store files
shall be cleaned up
- * as well. Otherwise false.
- * @return <tt>true</tt> if the job directory is successfully deleted or
non-existing;
- * <tt>false</tt> otherwise
+ * @param jobId The {@code JobID} of the job that is subject to cleanup.
+ * @throws IOException if the cleanup failed.
*/
- public boolean cleanupJob(JobID jobId, boolean cleanupBlobStoreFiles) {
+ @Override
+ public void localCleanup(JobID jobId) throws IOException {
checkNotNull(jobId);
final File jobDir =
new File(
BlobUtils.getStorageLocationPath(
storageDir.deref().getAbsolutePath(), jobId));
+ FileUtils.deleteDirectory(jobDir);
+
+ // NOTE on why blobExpiryTimes are not cleaned up:
+ // Instead of going through blobExpiryTimes, keep lingering
entries - they
+ // will be cleaned up by the timer task which tolerates
non-existing files
+ // If inserted again with the same IDs (via put()), the TTL will
be updated
+ // again.
+ }
+
+ /**
+ * Removes all BLOBs from local and HA store belonging to the given {@link
JobID}.
+ *
+ * @param jobId ID of the job this blob belongs to
+ * @throws Exception if the cleanup fails.
+ */
+ @Override
+ public void globalCleanup(JobID jobId) throws Exception {
+ checkNotNull(jobId);
+
readWriteLock.writeLock().lock();
try {
- // delete locally
- boolean deletedLocally = false;
- try {
- FileUtils.deleteDirectory(jobDir);
-
- // NOTE on why blobExpiryTimes are not cleaned up:
- // Instead of going through blobExpiryTimes, keep
lingering entries - they
- // will be cleaned up by the timer task which tolerates
non-existing files
- // If inserted again with the same IDs (via put()), the
TTL will be updated
- // again.
+ Exception exception = null;
- deletedLocally = true;
+ try {
+ localCleanup(jobId);
} catch (IOException e) {
- LOG.warn(
- "Failed to locally delete BLOB storage directory at "
- + jobDir.getAbsolutePath(),
- e);
+ exception = e;
}
- // delete in HA blob store files
- final boolean deletedHA = !cleanupBlobStoreFiles ||
blobStore.deleteAll(jobId);
+ if (!blobStore.deleteAll(jobId)) {
+ exception =
+ ExceptionUtils.firstOrSuppressed(
+ new FlinkException(
+ "Error while cleaning up the BlobStore
for job " + jobId),
+ exception);
+ }
- return deletedLocally && deletedHA;
+ ExceptionUtils.tryRethrowException(exception);
} finally {
readWriteLock.writeLock().unlock();
}
}
- public void retainJobs(Collection<JobID> jobsToRetain) throws IOException {
+ public void retainJobs(Collection<JobID> jobsToRetain) throws Exception {
Review comment:
I reverted it back together with updating the `*alCleanup` method
signatures... 👍
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]