[
https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16188030#comment-16188030
]
ASF GitHub Bot commented on FLINK-7068:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4358#discussion_r142136764
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java ---
@@ -407,4 +419,72 @@ static void closeSilently(Socket socket, Logger LOG) {
private BlobUtils() {
throw new RuntimeException();
}
+
+ /**
+ * Moves the temporary <tt>incomingFile</tt> to its permanent location
where it is available for
+ * use.
+ *
+ * @param incomingFile
+ * temporary file created during transfer
+ * @param jobId
+ * ID of the job this blob belongs to or <tt>null</tt> if
job-unrelated
+ * @param blobKey
+ * BLOB key identifying the file
+ * @param storageFile
+ * (local) file where the blob is/should be stored
+ * @param writeLock
+ * lock to acquire before doing the move
+ * @param log
+ * logger for debug information
+ * @param blobStore
+ * HA store (or <tt>null</tt> if unavailable)
+ *
+ * @throws IOException
+ * thrown if an I/O error occurs while moving the file or
uploading it to the HA store
+ */
+ static void moveTempFileToStore(
+ File incomingFile, @Nullable JobID jobId, BlobKey
blobKey, File storageFile,
+ Lock writeLock, Logger log, @Nullable BlobStore
blobStore) throws IOException {
+
+ writeLock.lock();
+
+ try {
+ // first check whether the file already exists
+ if (!storageFile.exists()) {
+ try {
+ // only move the file if it does not
yet exist
+ Files.move(incomingFile.toPath(),
storageFile.toPath());
+
+ incomingFile = null;
+
+ } catch (FileAlreadyExistsException ignored) {
+ log.warn("Detected concurrent file
modifications. This should only happen if multiple" +
+ "BlobServer use the same
storage directory.");
+ // we cannot be sure at this point
whether the file has already been uploaded to the blob
+ // store or not. Even if the blobStore
might shortly be in an inconsistent state, we have
--- End diff --
"we have to"
> change BlobService sub-classes for permanent and transient BLOBs
> ----------------------------------------------------------------
>
> Key: FLINK-7068
> URL: https://issues.apache.org/jira/browse/FLINK-7068
> Project: Flink
> Issue Type: Sub-task
> Components: Distributed Coordination, Network
> Affects Versions: 1.4.0
> Reporter: Nico Kruber
> Assignee: Nico Kruber
>
> A {{PermanentBlobStore}} should resemble use cases for BLOBs that are
> permanently stored for a job's life time (HA and non-HA).
> A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc.
> which even does not have to be reflected by files.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)