Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4238#discussion_r125218256
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java ---
@@ -107,146 +133,268 @@ public BlobCache(
this.numFetchRetries = 0;
}
+ // Initializing the clean up task
+ this.cleanupTimer = new Timer(true);
+
+ cleanupInterval = blobClientConfig.getLong(
+ ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
+
ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000;
+ this.cleanupTimer.schedule(this, cleanupInterval,
cleanupInterval);
+
// Add shutdown hook to delete storage directory
shutdownHook = BlobUtils.addShutdownHook(this, LOG);
}
+ @Override
+ public void registerJob(JobID jobId) {
+ synchronized (lockObject) {
+ RefCount ref = jobRefCounters.get(jobId);
+ if (ref == null) {
+ ref = new RefCount();
+ jobRefCounters.put(jobId, ref);
+ }
+ ++ref.references;
+ }
+ }
+
+ @Override
+ public void releaseJob(JobID jobId) {
+ synchronized (lockObject) {
+ RefCount ref = jobRefCounters.get(jobId);
+
+ if (ref == null) {
+ LOG.warn("improper use of releaseJob() without
a matching number of registerJob() calls");
+ return;
+ }
+
+ --ref.references;
+ if (ref.references == 0) {
+ ref.keepUntil = System.currentTimeMillis() +
cleanupInterval;
+ }
+ }
+ }
+
+ /**
+ * Returns local copy of the (job-unrelated) file for the BLOB with the
given key.
+ * <p>
+ * The method will first attempt to serve the BLOB from its local
cache. If the BLOB is not in
+ * the cache, the method will try to download it from this cache's BLOB
server.
+ *
+ * @param key
+ * The key of the desired BLOB.
+ *
+ * @return file referring to the local storage location of the BLOB.
+ *
+ * @throws IOException
+ * Thrown if an I/O error occurs while downloading the
BLOBs from the BLOB server.
+ */
+ @Override
+ public File getFile(BlobKey key) throws IOException {
+ return getFileInternal(null, key);
+ }
+
/**
- * Returns the URL for the BLOB with the given key. The method will
first attempt to serve
- * the BLOB from its local cache. If the BLOB is not in the cache, the
method will try to download it
- * from this cache's BLOB server.
+ * Returns local copy of the file for the BLOB with the given key.
+ * <p>
+ * The method will first attempt to serve the BLOB from its local
cache. If the BLOB is not in
+ * the cache, the method will try to download it from this cache's BLOB
server.
*
- * @param requiredBlob The key of the desired BLOB.
- * @return URL referring to the local storage location of the BLOB.
- * @throws IOException Thrown if an I/O error occurs while downloading
the BLOBs from the BLOB server.
+ * @param jobId
+ * ID of the job this blob belongs to
+ * @param key
+ * The key of the desired BLOB.
+ *
+ * @return file referring to the local storage location of the BLOB.
+ *
+ * @throws IOException
+ * Thrown if an I/O error occurs while downloading the
BLOBs from the BLOB server.
*/
- public URL getURL(final BlobKey requiredBlob) throws IOException {
+ @Override
+ public File getFile(@Nonnull JobID jobId, BlobKey key) throws
IOException {
+ checkNotNull(jobId);
+ return getFileInternal(jobId, key);
+ }
+
+ /**
+ * Returns local copy of the file for the BLOB with the given key.
+ * <p>
+ * The method will first attempt to serve the BLOB from its local
cache. If the BLOB is not in
+ * the cache, the method will try to download it from this cache's BLOB
server.
+ *
+ * @param jobId
+ * ID of the job this blob belongs to (or <tt>null</tt> if
job-unrelated)
+ * @param requiredBlob
+ * The key of the desired BLOB.
+ *
+ * @return file referring to the local storage location of the BLOB.
+ *
+ * @throws IOException
+ * Thrown if an I/O error occurs while downloading the
BLOBs from the BLOB server.
+ */
+ private File getFileInternal(@Nullable JobID jobId, BlobKey
requiredBlob) throws IOException {
checkArgument(requiredBlob != null, "BLOB key cannot be null.");
- final File localJarFile =
BlobUtils.getStorageLocation(storageDir, requiredBlob);
+ final File localJarFile =
BlobUtils.getStorageLocation(storageDir, jobId, requiredBlob);
if (localJarFile.exists()) {
- return localJarFile.toURI().toURL();
+ return localJarFile;
}
// first try the distributed blob store (if available)
try {
- blobView.get(requiredBlob, localJarFile);
+ blobView.get(jobId, requiredBlob, localJarFile);
} catch (Exception e) {
LOG.info("Failed to copy from blob store. Downloading
from BLOB server instead.", e);
}
if (localJarFile.exists()) {
- return localJarFile.toURI().toURL();
+ return localJarFile;
}
// fallback: download from the BlobServer
final byte[] buf = new byte[BlobServerProtocol.BUFFER_SIZE];
+ LOG.info("Downloading {}/{} from {}", jobId, requiredBlob,
serverAddress);
// loop over retries
int attempt = 0;
while (true) {
-
- if (attempt == 0) {
- LOG.info("Downloading {} from {}",
requiredBlob, serverAddress);
- } else {
- LOG.info("Downloading {} from {} (retry {})",
requiredBlob, serverAddress, attempt);
- }
-
- try {
- BlobClient bc = null;
- InputStream is = null;
- OutputStream os = null;
-
- try {
- bc = new BlobClient(serverAddress,
blobClientConfig);
- is = bc.get(requiredBlob);
- os = new FileOutputStream(localJarFile);
-
- while (true) {
- final int read = is.read(buf);
- if (read < 0) {
- break;
- }
- os.write(buf, 0, read);
- }
-
- // we do explicitly not use a finally
block, because we want the closing
- // in the regular case to throw
exceptions and cause the writing to fail.
- // But, the closing on exception should
not throw further exceptions and
- // let us keep the root exception
- os.close();
- os = null;
- is.close();
- is = null;
- bc.close();
- bc = null;
-
- // success, we finished
- return localJarFile.toURI().toURL();
- }
- catch (Throwable t) {
- // we use "catch (Throwable)" to keep
the root exception. Otherwise that exception
- // it would be replaced by any
exception thrown in the finally block
- IOUtils.closeQuietly(os);
- IOUtils.closeQuietly(is);
- IOUtils.closeQuietly(bc);
-
- if (t instanceof IOException) {
- throw (IOException) t;
- } else {
- throw new
IOException(t.getMessage(), t);
+ try (
+ final BlobClient bc = new
BlobClient(serverAddress, blobClientConfig);
+ final InputStream is = bc.getInternal(jobId,
requiredBlob);
+ final OutputStream os = new
FileOutputStream(localJarFile)
+ ) {
+ while (true) {
+ final int read = is.read(buf);
+ if (read < 0) {
+ break;
}
+ os.write(buf, 0, read);
}
+
+ // success, we finished
+ return localJarFile;
}
- catch (IOException e) {
- String message = "Failed to fetch BLOB " +
requiredBlob + " from " + serverAddress +
+ catch (Throwable t) {
+ String message = "Failed to fetch BLOB " +
jobId + "/" + requiredBlob + " from " + serverAddress +
" and store it under " +
localJarFile.getAbsolutePath();
if (attempt < numFetchRetries) {
- attempt++;
if (LOG.isDebugEnabled()) {
- LOG.debug(message + "
Retrying...", e);
+ LOG.debug(message + "
Retrying...", t);
} else {
LOG.error(message + "
Retrying...");
}
}
else {
- LOG.error(message + " No retries
left.", e);
- throw new IOException(message, e);
+ LOG.error(message + " No retries
left.", t);
+ throw new IOException(message, t);
}
+
+ // retry
+ ++attempt;
+ LOG.info("Downloading {}/{} from {} (retry
{})", jobId, requiredBlob, serverAddress, attempt);
}
} // end loop over retries
}
/**
- * Deletes the file associated with the given key from the BLOB cache.
- * @param key referring to the file to be deleted
+ * Deletes the (job-unrelated) file associated with the blob key in
this BLOB cache.
+ *
+ * @param key
+ * blob key associated with the file to be deleted
+ *
+ * @throws IOException
+ */
+ @Override
+ public void delete(BlobKey key) throws IOException {
+ deleteInternal(null, key);
+ }
+
+ /**
+ * Deletes the file associated with the blob key in this BLOB cache.
+ *
+ * @param jobId
+ * ID of the job this blob belongs to
+ * @param key
+ * blob key associated with the file to be deleted
+ *
+ * @throws IOException
*/
- public void delete(BlobKey key) throws IOException{
- final File localFile = BlobUtils.getStorageLocation(storageDir,
key);
+ @Override
+ public void delete(@Nonnull JobID jobId, BlobKey key) throws
IOException {
+ checkNotNull(jobId);
+ deleteInternal(jobId, key);
+ }
- if (localFile.exists() && !localFile.delete()) {
- LOG.warn("Failed to delete locally cached BLOB " + key
+ " at " + localFile.getAbsolutePath());
+ /**
+ * Deletes the file associated with the blob key in this BLOB cache.
+ *
+ * @param jobId
+ * ID of the job this blob belongs to (or <tt>null</tt> if
job-unrelated)
+ * @param key
+ * blob key associated with the file to be deleted
+ *
+ * @throws IOException
+ */
+ private void deleteInternal(@Nullable JobID jobId, BlobKey key) throws
IOException{
+ final File localFile = BlobUtils.getStorageLocation(storageDir,
jobId, key);
+ if (!localFile.delete() && localFile.exists()) {
+ LOG.warn("Failed to delete locally cached BLOB {} at
{}", key, localFile.getAbsolutePath());
}
}
/**
- * Deletes the file associated with the given key from the BLOB cache
and
+ * Deletes the (job-unrelated) file associated with the given key from
the BLOB cache and
* BLOB server.
*
- * @param key referring to the file to be deleted
+ * @param key
+ * referring to the file to be deleted
+ *
* @throws IOException
- * thrown if an I/O error occurs while transferring the request
to
- * the BLOB server or if the BLOB server cannot delete the file
+ * thrown if an I/O error occurs while transferring the
request to the BLOB server or if the
+ * BLOB server cannot delete the file
*/
public void deleteGlobal(BlobKey key) throws IOException {
+ deleteGlobalInternal(null, key);
+ }
+
+ /**
+ * Deletes the file associated with the given key from the BLOB cache
and BLOB server.
+ *
+ * @param jobId
+ * ID of the job this blob belongs to
+ * @param key
+ * referring to the file to be deleted
+ *
+ * @throws IOException
+ * thrown if an I/O error occurs while transferring the
request to the BLOB server or if the
+ * BLOB server cannot delete the file
+ */
+ public void deleteGlobal(@Nonnull JobID jobId, BlobKey key) throws
IOException {
--- End diff --
Do we still want the cache to be able to delete files from the server? I
thought the server should make these kind of decisions.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---