Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4238#discussion_r125217930
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java ---
@@ -107,146 +133,268 @@ public BlobCache(
this.numFetchRetries = 0;
}
+ // Initializing the clean up task
+ this.cleanupTimer = new Timer(true);
+
+ cleanupInterval = blobClientConfig.getLong(
+ ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
+
ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000;
+ this.cleanupTimer.schedule(this, cleanupInterval,
cleanupInterval);
+
// Add shutdown hook to delete storage directory
shutdownHook = BlobUtils.addShutdownHook(this, LOG);
}
+ @Override
+ public void registerJob(JobID jobId) {
+ synchronized (lockObject) {
+ RefCount ref = jobRefCounters.get(jobId);
+ if (ref == null) {
+ ref = new RefCount();
+ jobRefCounters.put(jobId, ref);
+ }
+ ++ref.references;
+ }
+ }
+
+ @Override
+ public void releaseJob(JobID jobId) {
+ synchronized (lockObject) {
+ RefCount ref = jobRefCounters.get(jobId);
+
+ if (ref == null) {
+ LOG.warn("improper use of releaseJob() without
a matching number of registerJob() calls");
+ return;
+ }
+
+ --ref.references;
+ if (ref.references == 0) {
+ ref.keepUntil = System.currentTimeMillis() +
cleanupInterval;
+ }
+ }
+ }
+
+ /**
+ * Returns local copy of the (job-unrelated) file for the BLOB with the
given key.
+ * <p>
+ * The method will first attempt to serve the BLOB from its local
cache. If the BLOB is not in
+ * the cache, the method will try to download it from this cache's BLOB
server.
+ *
+ * @param key
+ * The key of the desired BLOB.
+ *
+ * @return file referring to the local storage location of the BLOB.
+ *
+ * @throws IOException
+ * Thrown if an I/O error occurs while downloading the
BLOBs from the BLOB server.
+ */
+ @Override
+ public File getFile(BlobKey key) throws IOException {
+ return getFileInternal(null, key);
+ }
+
/**
- * Returns the URL for the BLOB with the given key. The method will
first attempt to serve
- * the BLOB from its local cache. If the BLOB is not in the cache, the
method will try to download it
- * from this cache's BLOB server.
+ * Returns local copy of the file for the BLOB with the given key.
+ * <p>
+ * The method will first attempt to serve the BLOB from its local
cache. If the BLOB is not in
+ * the cache, the method will try to download it from this cache's BLOB
server.
*
- * @param requiredBlob The key of the desired BLOB.
- * @return URL referring to the local storage location of the BLOB.
- * @throws IOException Thrown if an I/O error occurs while downloading
the BLOBs from the BLOB server.
+ * @param jobId
+ * ID of the job this blob belongs to
+ * @param key
+ * The key of the desired BLOB.
+ *
+ * @return file referring to the local storage location of the BLOB.
+ *
+ * @throws IOException
+ * Thrown if an I/O error occurs while downloading the
BLOBs from the BLOB server.
*/
- public URL getURL(final BlobKey requiredBlob) throws IOException {
+ @Override
+ public File getFile(@Nonnull JobID jobId, BlobKey key) throws
IOException {
--- End diff --
I think so far the convention is that fields without an annotation are
considered `@Nonnull` and only fields which are annotated with `@Nullable` can
be `null`. Otherwise `key` should also be marked as `@Nonnull`.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---