[
https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16188026#comment-16188026
]
ASF GitHub Bot commented on FLINK-7068:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4358#discussion_r142129505
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java ---
@@ -120,6 +125,76 @@ public BlobClient(InetSocketAddress serverAddress,
Configuration clientConfig) t
}
}
+ /**
+ * Downloads the given BLOB from the given server and stores its
contents to a (local) file.
+ *
+ * <p>Transient BLOB files are deleted after a successful copy of the
server's data into the
+ * given <tt>localJarFile</tt>.
+ *
+ * @param jobId
+ * job ID the BLOB belongs to or <tt>null</tt> if
job-unrelated
+ * @param blobKey
+ * BLOB key
+ * @param localJarFile
+ * the local file to write to
+ * @param serverAddress
+ * address of the server to download from
+ * @param blobClientConfig
+ * client configuration for the connection
+ * @param numFetchRetries
+ * number of retries before failing
+ *
+ * @throws IOException
+ * if an I/O error occurs during the download
+ */
+ static void downloadFromBlobServer(
+ @Nullable JobID jobId, BlobKey blobKey, File
localJarFile,
+ InetSocketAddress serverAddress, Configuration
blobClientConfig, int numFetchRetries)
+ throws IOException {
+
+ final byte[] buf = new byte[BUFFER_SIZE];
+ LOG.info("Downloading {}/{} from {}", jobId, blobKey,
serverAddress);
+
+ // loop over retries
+ int attempt = 0;
+ while (true) {
+ try (
+ final BlobClient bc = new
BlobClient(serverAddress, blobClientConfig);
+ final InputStream is = bc.getInternal(jobId,
blobKey);
+ final OutputStream os = new
FileOutputStream(localJarFile)
+ ) {
+ while (true) {
+ final int read = is.read(buf);
+ if (read < 0) {
+ break;
+ }
+ os.write(buf, 0, read);
+ }
+
+ return;
+ }
+ catch (Throwable t) {
+ String message = "Failed to fetch BLOB " +
jobId + "/" + blobKey + " from " + serverAddress +
+ " and store it under " +
localJarFile.getAbsolutePath();
+ if (attempt < numFetchRetries) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(message + "
Retrying...", t);
--- End diff --
Shouldn't this also be an error if the other log statement is an error as
well?
> change BlobService sub-classes for permanent and transient BLOBs
> ----------------------------------------------------------------
>
> Key: FLINK-7068
> URL: https://issues.apache.org/jira/browse/FLINK-7068
> Project: Flink
> Issue Type: Sub-task
> Components: Distributed Coordination, Network
> Affects Versions: 1.4.0
> Reporter: Nico Kruber
> Assignee: Nico Kruber
>
> A {{PermanentBlobStore}} should resemble use cases for BLOBs that are
> permanently stored for a job's life time (HA and non-HA).
> A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc.
> which even does not have to be reflected by files.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)