[
https://issues.apache.org/jira/browse/FLINK-7057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16073226#comment-16073226
]
ASF GitHub Bot commented on FLINK-7057:
---------------------------------------
Github user NicoK commented on a diff in the pull request:
https://github.com/apache/flink/pull/4238#discussion_r125404358
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java ---
@@ -107,146 +133,268 @@ public BlobCache(
this.numFetchRetries = 0;
}
+ // Initializing the clean up task
+ this.cleanupTimer = new Timer(true);
+
+ cleanupInterval = blobClientConfig.getLong(
+ ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
+
ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000;
+ this.cleanupTimer.schedule(this, cleanupInterval,
cleanupInterval);
+
// Add shutdown hook to delete storage directory
shutdownHook = BlobUtils.addShutdownHook(this, LOG);
}
+ @Override
+ public void registerJob(JobID jobId) {
+ synchronized (lockObject) {
+ RefCount ref = jobRefCounters.get(jobId);
+ if (ref == null) {
+ ref = new RefCount();
+ jobRefCounters.put(jobId, ref);
+ }
+ ++ref.references;
+ }
+ }
+
+ @Override
+ public void releaseJob(JobID jobId) {
+ synchronized (lockObject) {
+ RefCount ref = jobRefCounters.get(jobId);
+
+ if (ref == null) {
+ LOG.warn("improper use of releaseJob() without
a matching number of registerJob() calls");
+ return;
+ }
+
+ --ref.references;
+ if (ref.references == 0) {
+ ref.keepUntil = System.currentTimeMillis() +
cleanupInterval;
+ }
+ }
+ }
+
+ /**
+ * Returns local copy of the (job-unrelated) file for the BLOB with the
given key.
+ * <p>
+ * The method will first attempt to serve the BLOB from its local
cache. If the BLOB is not in
+ * the cache, the method will try to download it from this cache's BLOB
server.
+ *
+ * @param key
+ * The key of the desired BLOB.
+ *
+ * @return file referring to the local storage location of the BLOB.
+ *
+ * @throws IOException
+ * Thrown if an I/O error occurs while downloading the
BLOBs from the BLOB server.
+ */
+ @Override
+ public File getFile(BlobKey key) throws IOException {
+ return getFileInternal(null, key);
+ }
+
/**
- * Returns the URL for the BLOB with the given key. The method will
first attempt to serve
- * the BLOB from its local cache. If the BLOB is not in the cache, the
method will try to download it
- * from this cache's BLOB server.
+ * Returns local copy of the file for the BLOB with the given key.
+ * <p>
+ * The method will first attempt to serve the BLOB from its local
cache. If the BLOB is not in
+ * the cache, the method will try to download it from this cache's BLOB
server.
*
- * @param requiredBlob The key of the desired BLOB.
- * @return URL referring to the local storage location of the BLOB.
- * @throws IOException Thrown if an I/O error occurs while downloading
the BLOBs from the BLOB server.
+ * @param jobId
+ * ID of the job this blob belongs to
+ * @param key
+ * The key of the desired BLOB.
+ *
+ * @return file referring to the local storage location of the BLOB.
+ *
+ * @throws IOException
+ * Thrown if an I/O error occurs while downloading the
BLOBs from the BLOB server.
*/
- public URL getURL(final BlobKey requiredBlob) throws IOException {
+ @Override
+ public File getFile(@Nonnull JobID jobId, BlobKey key) throws
IOException {
+ checkNotNull(jobId);
+ return getFileInternal(jobId, key);
+ }
+
+ /**
+ * Returns local copy of the file for the BLOB with the given key.
+ * <p>
+ * The method will first attempt to serve the BLOB from its local
cache. If the BLOB is not in
+ * the cache, the method will try to download it from this cache's BLOB
server.
+ *
+ * @param jobId
+ * ID of the job this blob belongs to (or <tt>null</tt> if
job-unrelated)
+ * @param requiredBlob
+ * The key of the desired BLOB.
+ *
+ * @return file referring to the local storage location of the BLOB.
+ *
+ * @throws IOException
+ * Thrown if an I/O error occurs while downloading the
BLOBs from the BLOB server.
+ */
+ private File getFileInternal(@Nullable JobID jobId, BlobKey
requiredBlob) throws IOException {
checkArgument(requiredBlob != null, "BLOB key cannot be null.");
- final File localJarFile =
BlobUtils.getStorageLocation(storageDir, requiredBlob);
+ final File localJarFile =
BlobUtils.getStorageLocation(storageDir, jobId, requiredBlob);
if (localJarFile.exists()) {
- return localJarFile.toURI().toURL();
+ return localJarFile;
}
// first try the distributed blob store (if available)
try {
- blobView.get(requiredBlob, localJarFile);
+ blobView.get(jobId, requiredBlob, localJarFile);
} catch (Exception e) {
LOG.info("Failed to copy from blob store. Downloading
from BLOB server instead.", e);
}
if (localJarFile.exists()) {
- return localJarFile.toURI().toURL();
+ return localJarFile;
}
// fallback: download from the BlobServer
final byte[] buf = new byte[BlobServerProtocol.BUFFER_SIZE];
+ LOG.info("Downloading {}/{} from {}", jobId, requiredBlob,
serverAddress);
// loop over retries
int attempt = 0;
while (true) {
-
- if (attempt == 0) {
- LOG.info("Downloading {} from {}",
requiredBlob, serverAddress);
- } else {
- LOG.info("Downloading {} from {} (retry {})",
requiredBlob, serverAddress, attempt);
- }
-
- try {
- BlobClient bc = null;
- InputStream is = null;
- OutputStream os = null;
-
- try {
- bc = new BlobClient(serverAddress,
blobClientConfig);
- is = bc.get(requiredBlob);
- os = new FileOutputStream(localJarFile);
-
- while (true) {
- final int read = is.read(buf);
- if (read < 0) {
- break;
- }
- os.write(buf, 0, read);
- }
-
- // we do explicitly not use a finally
block, because we want the closing
- // in the regular case to throw
exceptions and cause the writing to fail.
- // But, the closing on exception should
not throw further exceptions and
- // let us keep the root exception
- os.close();
- os = null;
- is.close();
- is = null;
- bc.close();
- bc = null;
-
- // success, we finished
- return localJarFile.toURI().toURL();
- }
- catch (Throwable t) {
- // we use "catch (Throwable)" to keep
the root exception. Otherwise that exception
- // it would be replaced by any
exception thrown in the finally block
- IOUtils.closeQuietly(os);
- IOUtils.closeQuietly(is);
- IOUtils.closeQuietly(bc);
-
- if (t instanceof IOException) {
- throw (IOException) t;
- } else {
- throw new
IOException(t.getMessage(), t);
+ try (
+ final BlobClient bc = new
BlobClient(serverAddress, blobClientConfig);
+ final InputStream is = bc.getInternal(jobId,
requiredBlob);
+ final OutputStream os = new
FileOutputStream(localJarFile)
+ ) {
+ while (true) {
+ final int read = is.read(buf);
+ if (read < 0) {
+ break;
}
+ os.write(buf, 0, read);
}
+
+ // success, we finished
+ return localJarFile;
}
- catch (IOException e) {
- String message = "Failed to fetch BLOB " +
requiredBlob + " from " + serverAddress +
+ catch (Throwable t) {
+ String message = "Failed to fetch BLOB " +
jobId + "/" + requiredBlob + " from " + serverAddress +
" and store it under " +
localJarFile.getAbsolutePath();
if (attempt < numFetchRetries) {
- attempt++;
if (LOG.isDebugEnabled()) {
- LOG.debug(message + "
Retrying...", e);
+ LOG.debug(message + "
Retrying...", t);
} else {
LOG.error(message + "
Retrying...");
}
}
else {
- LOG.error(message + " No retries
left.", e);
- throw new IOException(message, e);
+ LOG.error(message + " No retries
left.", t);
+ throw new IOException(message, t);
}
+
+ // retry
+ ++attempt;
+ LOG.info("Downloading {}/{} from {} (retry
{})", jobId, requiredBlob, serverAddress, attempt);
}
} // end loop over retries
}
/**
- * Deletes the file associated with the given key from the BLOB cache.
- * @param key referring to the file to be deleted
+ * Deletes the (job-unrelated) file associated with the blob key in
this BLOB cache.
+ *
+ * @param key
+ * blob key associated with the file to be deleted
+ *
+ * @throws IOException
+ */
+ @Override
+ public void delete(BlobKey key) throws IOException {
+ deleteInternal(null, key);
+ }
+
+ /**
+ * Deletes the file associated with the blob key in this BLOB cache.
+ *
+ * @param jobId
+ * ID of the job this blob belongs to
+ * @param key
+ * blob key associated with the file to be deleted
+ *
+ * @throws IOException
*/
- public void delete(BlobKey key) throws IOException{
- final File localFile = BlobUtils.getStorageLocation(storageDir,
key);
+ @Override
+ public void delete(@Nonnull JobID jobId, BlobKey key) throws
IOException {
+ checkNotNull(jobId);
+ deleteInternal(jobId, key);
+ }
- if (localFile.exists() && !localFile.delete()) {
- LOG.warn("Failed to delete locally cached BLOB " + key
+ " at " + localFile.getAbsolutePath());
+ /**
+ * Deletes the file associated with the blob key in this BLOB cache.
+ *
+ * @param jobId
+ * ID of the job this blob belongs to (or <tt>null</tt> if
job-unrelated)
+ * @param key
+ * blob key associated with the file to be deleted
+ *
+ * @throws IOException
+ */
+ private void deleteInternal(@Nullable JobID jobId, BlobKey key) throws
IOException{
+ final File localFile = BlobUtils.getStorageLocation(storageDir,
jobId, key);
+ if (!localFile.delete() && localFile.exists()) {
+ LOG.warn("Failed to delete locally cached BLOB {} at
{}", key, localFile.getAbsolutePath());
}
}
/**
- * Deletes the file associated with the given key from the BLOB cache
and
+ * Deletes the (job-unrelated) file associated with the given key from
the BLOB cache and
* BLOB server.
*
- * @param key referring to the file to be deleted
+ * @param key
+ * referring to the file to be deleted
+ *
* @throws IOException
- * thrown if an I/O error occurs while transferring the request
to
- * the BLOB server or if the BLOB server cannot delete the file
+ * thrown if an I/O error occurs while transferring the
request to the BLOB server or if the
+ * BLOB server cannot delete the file
*/
public void deleteGlobal(BlobKey key) throws IOException {
+ deleteGlobalInternal(null, key);
+ }
+
+ /**
+ * Deletes the file associated with the given key from the BLOB cache
and BLOB server.
+ *
+ * @param jobId
+ * ID of the job this blob belongs to
+ * @param key
+ * referring to the file to be deleted
+ *
+ * @throws IOException
+ * thrown if an I/O error occurs while transferring the
request to the BLOB server or if the
+ * BLOB server cannot delete the file
+ */
+ public void deleteGlobal(@Nonnull JobID jobId, BlobKey key) throws
IOException {
--- End diff --
this will change in a future PR (FLIP-19 is not complete yet with this)
> move BLOB ref-counting from LibraryCacheManager to BlobCache
> ------------------------------------------------------------
>
> Key: FLINK-7057
> URL: https://issues.apache.org/jira/browse/FLINK-7057
> Project: Flink
> Issue Type: Sub-task
> Components: Distributed Coordination, Network
> Affects Versions: 1.4.0
> Reporter: Nico Kruber
> Assignee: Nico Kruber
>
> Currently, the {{LibraryCacheManager}} is doing some ref-counting for JAR
> files managed by it. Instead, we want the {{BlobCache}} to do that itself for
> all job-related BLOBs. Also, we do not want to operate on a per-{{BlobKey}}
> level but rather per job. Therefore, the cleanup process should be adapted,
> too.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)