[
https://issues.apache.org/jira/browse/FLINK-7057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16072338#comment-16072338
]
ASF GitHub Bot commented on FLINK-7057:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4238#discussion_r125217930
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java ---
@@ -107,146 +133,268 @@ public BlobCache(
this.numFetchRetries = 0;
}
+ // Initializing the clean up task
+ this.cleanupTimer = new Timer(true);
+
+ cleanupInterval = blobClientConfig.getLong(
+ ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
+
ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000;
+ this.cleanupTimer.schedule(this, cleanupInterval,
cleanupInterval);
+
// Add shutdown hook to delete storage directory
shutdownHook = BlobUtils.addShutdownHook(this, LOG);
}
+ @Override
+ public void registerJob(JobID jobId) {
+ synchronized (lockObject) {
+ RefCount ref = jobRefCounters.get(jobId);
+ if (ref == null) {
+ ref = new RefCount();
+ jobRefCounters.put(jobId, ref);
+ }
+ ++ref.references;
+ }
+ }
+
+ @Override
+ public void releaseJob(JobID jobId) {
+ synchronized (lockObject) {
+ RefCount ref = jobRefCounters.get(jobId);
+
+ if (ref == null) {
+ LOG.warn("improper use of releaseJob() without
a matching number of registerJob() calls");
+ return;
+ }
+
+ --ref.references;
+ if (ref.references == 0) {
+ ref.keepUntil = System.currentTimeMillis() +
cleanupInterval;
+ }
+ }
+ }
+
+ /**
+ * Returns local copy of the (job-unrelated) file for the BLOB with the
given key.
+ * <p>
+ * The method will first attempt to serve the BLOB from its local
cache. If the BLOB is not in
+ * the cache, the method will try to download it from this cache's BLOB
server.
+ *
+ * @param key
+ * The key of the desired BLOB.
+ *
+ * @return file referring to the local storage location of the BLOB.
+ *
+ * @throws IOException
+ * Thrown if an I/O error occurs while downloading the
BLOBs from the BLOB server.
+ */
+ @Override
+ public File getFile(BlobKey key) throws IOException {
+ return getFileInternal(null, key);
+ }
+
/**
- * Returns the URL for the BLOB with the given key. The method will
first attempt to serve
- * the BLOB from its local cache. If the BLOB is not in the cache, the
method will try to download it
- * from this cache's BLOB server.
+ * Returns local copy of the file for the BLOB with the given key.
+ * <p>
+ * The method will first attempt to serve the BLOB from its local
cache. If the BLOB is not in
+ * the cache, the method will try to download it from this cache's BLOB
server.
*
- * @param requiredBlob The key of the desired BLOB.
- * @return URL referring to the local storage location of the BLOB.
- * @throws IOException Thrown if an I/O error occurs while downloading
the BLOBs from the BLOB server.
+ * @param jobId
+ * ID of the job this blob belongs to
+ * @param key
+ * The key of the desired BLOB.
+ *
+ * @return file referring to the local storage location of the BLOB.
+ *
+ * @throws IOException
+ * Thrown if an I/O error occurs while downloading the
BLOBs from the BLOB server.
*/
- public URL getURL(final BlobKey requiredBlob) throws IOException {
+ @Override
+ public File getFile(@Nonnull JobID jobId, BlobKey key) throws
IOException {
--- End diff --
I think so far the convention is that fields without an annotation are
considered `@Nonnull` and only fields which are annotated with `@Nullable` can
be `null`. Otherwise `key` should also be marked as `@Nonnull`.
> move BLOB ref-counting from LibraryCacheManager to BlobCache
> ------------------------------------------------------------
>
> Key: FLINK-7057
> URL: https://issues.apache.org/jira/browse/FLINK-7057
> Project: Flink
> Issue Type: Sub-task
> Components: Distributed Coordination, Network
> Affects Versions: 1.4.0
> Reporter: Nico Kruber
> Assignee: Nico Kruber
>
> Currently, the {{LibraryCacheManager}} is doing some ref-counting for JAR
> files managed by it. Instead, we want the {{BlobCache}} to do that itself for
> all job-related BLOBs. Also, we do not want to operate on a per-{{BlobKey}}
> level but rather per job. Therefore, the cleanup process should be adapted,
> too.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)