[
https://issues.apache.org/jira/browse/FLINK-7057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16134688#comment-16134688
]
ASF GitHub Bot commented on FLINK-7057:
---------------------------------------
Github user tedyu commented on a diff in the pull request:
https://github.com/apache/flink/pull/4238#discussion_r134142624
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java ---
@@ -108,11 +139,63 @@ public BlobCache(
this.numFetchRetries = 0;
}
+ // Initializing the clean up task
+ this.cleanupTimer = new Timer(true);
+
+ cleanupInterval =
blobClientConfig.getLong(BlobServerOptions.CLEANUP_INTERVAL) * 1000;
+ this.cleanupTimer.schedule(this, cleanupInterval,
cleanupInterval);
+
// Add shutdown hook to delete storage directory
shutdownHook = BlobUtils.addShutdownHook(this, LOG);
}
/**
+ * Registers use of job-related BLOBs.
+ * <p>
+ * Using any other method to access BLOBs, e.g. {@link #getFile}, is
only valid within calls
+ * to {@link #registerJob(JobID)} and {@link #releaseJob(JobID)}.
+ *
+ * @param jobId
+ * ID of the job this blob belongs to
+ *
+ * @see #releaseJob(JobID)
+ */
+ public void registerJob(JobID jobId) {
+ synchronized (jobRefCounters) {
+ RefCount ref = jobRefCounters.get(jobId);
+ if (ref == null) {
+ ref = new RefCount();
+ jobRefCounters.put(jobId, ref);
+ }
+ ++ref.references;
+ }
+ }
+
+ /**
+ * Unregisters use of job-related BLOBs and allow them to be released.
+ *
+ * @param jobId
+ * ID of the job this blob belongs to
+ *
+ * @see #registerJob(JobID)
+ */
+ public void releaseJob(JobID jobId) {
+ synchronized (jobRefCounters) {
+ RefCount ref = jobRefCounters.get(jobId);
+
+ if (ref == null) {
+ LOG.warn("improper use of releaseJob() without
a matching number of registerJob() calls");
--- End diff --
Including jobId would help troubleshooting.
> move BLOB ref-counting from LibraryCacheManager to BlobCache
> ------------------------------------------------------------
>
> Key: FLINK-7057
> URL: https://issues.apache.org/jira/browse/FLINK-7057
> Project: Flink
> Issue Type: Sub-task
> Components: Distributed Coordination, Network
> Affects Versions: 1.4.0
> Reporter: Nico Kruber
> Assignee: Nico Kruber
> Fix For: 1.4.0
>
>
> Currently, the {{LibraryCacheManager}} is doing some ref-counting for JAR
> files managed by it. Instead, we want the {{BlobCache}} to do that itself for
> all job-related BLOBs. Also, we do not want to operate on a per-{{BlobKey}}
> level but rather per job. Therefore, the cleanup process should be adapted,
> too.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)