Github user StephanEwen commented on a diff in the pull request:
https://github.com/apache/flink/pull/3512#discussion_r106674262
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java ---
@@ -180,91 +180,159 @@ public URL getURL(final BlobKey requiredBlob) throws
IOException {
// fallback: download from the BlobServer
final byte[] buf = new byte[BlobServerProtocol.BUFFER_SIZE];
+ LOG.info("Downloading {} from {}", requiredBlob, serverAddress);
// loop over retries
int attempt = 0;
while (true) {
+ try (
+ final BlobClient bc = new
BlobClient(serverAddress, blobClientConfig);
+ final InputStream is = bc.get(requiredBlob);
+ final OutputStream os = new
FileOutputStream(localJarFile)
+ ) {
+ getURLTransferFile(buf, is, os);
+
+ // success, we finished
+ return localJarFile.toURI().toURL();
+ }
+ catch (Throwable t) {
+ getURLOnException(requiredBlob.toString(),
localJarFile, attempt, t);
- if (attempt == 0) {
- LOG.info("Downloading {} from {}",
requiredBlob, serverAddress);
- } else {
+ // retry
+ ++attempt;
LOG.info("Downloading {} from {} (retry {})",
requiredBlob, serverAddress, attempt);
}
+ } // end loop over retries
+ }
- try {
- BlobClient bc = null;
- InputStream is = null;
- OutputStream os = null;
+ /**
+ * Returns the URL for the BLOB with the given parameters. The method
will first attempt to
+ * serve the BLOB from its local cache. If the BLOB is not in the
cache, the method will try
+ * to download it from this cache's BLOB server.
+ *
+ * @param jobId JobID of the file in the blob store
+ * @param key String key of the file in the blob store
+ * @return URL referring to the local storage location of the BLOB.
+ * @throws java.io.FileNotFoundException if the path does not exist;
+ * @throws IOException Thrown if an I/O error occurs while downloading
the BLOBs from the BLOB server.
+ */
+ public URL getURL(final JobID jobId, final String key) throws
IOException {
+ checkArgument(jobId != null, "Job id cannot be null.");
+ checkArgument(key != null, "BLOB name cannot be null.");
- try {
- bc = new BlobClient(serverAddress,
blobClientConfig);
- is = bc.get(requiredBlob);
- os = new FileOutputStream(localJarFile);
-
- while (true) {
- final int read = is.read(buf);
- if (read < 0) {
- break;
- }
- os.write(buf, 0, read);
- }
-
- // we do explicitly not use a finally
block, because we want the closing
- // in the regular case to throw
exceptions and cause the writing to fail.
- // But, the closing on exception should
not throw further exceptions and
- // let us keep the root exception
- os.close();
- os = null;
- is.close();
- is = null;
- bc.close();
- bc = null;
-
- // success, we finished
- return localJarFile.toURI().toURL();
- }
- catch (Throwable t) {
- // we use "catch (Throwable)" to keep
the root exception. Otherwise that exception
- // it would be replaced by any
exception thrown in the finally block
- IOUtils.closeQuietly(os);
- IOUtils.closeQuietly(is);
- IOUtils.closeQuietly(bc);
-
- if (t instanceof IOException) {
- throw (IOException) t;
- } else {
- throw new
IOException(t.getMessage(), t);
- }
- }
+ final File localJarFile =
BlobUtils.getStorageLocation(storageDir, jobId, key);
+
+ if (localJarFile.exists()) {
+ return localJarFile.toURI().toURL();
+ }
+
+ // first try the distributed blob store (if available)
+ try {
+ blobStore.get(jobId, key, localJarFile);
+ } catch (Exception e) {
+ LOG.info("Failed to copy from blob store. Downloading
from BLOB server instead.", e);
+ }
+
+ if (localJarFile.exists()) {
+ return localJarFile.toURI().toURL();
+ }
+
+ // fallback: download from the BlobServer
+ final byte[] buf = new byte[BlobServerProtocol.BUFFER_SIZE];
+ LOG.info("Downloading {}/{} from {}", jobId, key,
serverAddress);
+
+ // loop over retries
+ int attempt = 0;
+ while (true) {
+ try (
+ final BlobClient bc = new
BlobClient(serverAddress, blobClientConfig);
+ final InputStream is = bc.get(jobId, key);
+ final OutputStream os = new
FileOutputStream(localJarFile)
+ ) {
+ getURLTransferFile(buf, is, os);
+
+ // success, we finished
+ return localJarFile.toURI().toURL();
}
- catch (IOException e) {
- String message = "Failed to fetch BLOB " +
requiredBlob + " from " + serverAddress +
- " and store it under " +
localJarFile.getAbsolutePath();
- if (attempt < numFetchRetries) {
- attempt++;
- if (LOG.isDebugEnabled()) {
- LOG.debug(message + "
Retrying...", e);
- } else {
- LOG.error(message + "
Retrying...");
- }
- }
- else {
- LOG.error(message + " No retries
left.", e);
- throw new IOException(message, e);
- }
+ catch (Throwable t) {
+ getURLOnException(String.format("%s/%s", jobId,
key), localJarFile, attempt, t);
+
+ // retry
+ ++attempt;
+ LOG.info("Downloading {}/{} from {} (retry
{})", jobId, key, serverAddress, attempt);
}
} // end loop over retries
}
+ private static void getURLTransferFile(
+ final byte[] buf, final InputStream is, final
OutputStream os) throws IOException {
+ while (true) {
+ final int read = is.read(buf);
+ if (read < 0) {
+ break;
+ }
+ os.write(buf, 0, read);
+ }
+ }
+
+ private final void getURLOnException(
+ final String requiredBlob, final File localJarFile,
final int attempt,
+ final Throwable t) throws IOException {
+ String message = "Failed to fetch BLOB " + requiredBlob + "
from " + serverAddress +
+ " and store it under " + localJarFile.getAbsolutePath();
+ if (attempt < numFetchRetries) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(message + " Retrying...", t);
+ } else {
+ LOG.error(message + " Retrying...");
+ }
+ }
+ else {
+ LOG.error(message + " No retries left.", t);
+ throw new IOException(message, t);
+ }
+ }
+
/**
* Deletes the file associated with the given key from the BLOB cache.
+ *
* @param key referring to the file to be deleted
*/
- public void delete(BlobKey key) throws IOException{
+ @Override
+ public void delete(BlobKey key) {
final File localFile = BlobUtils.getStorageLocation(storageDir,
key);
if (localFile.exists() && !localFile.delete()) {
- LOG.warn("Failed to delete locally cached BLOB " + key
+ " at " + localFile.getAbsolutePath());
+ LOG.warn("Failed to delete locally cached BLOB {} at
{}" + key, localFile.getAbsolutePath());
--- End diff --
The `+` is wrong here, I think.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---