[ 
https://issues.apache.org/jira/browse/BEAM-5036?focusedWorklogId=151349&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-151349
 ]

ASF GitHub Bot logged work on BEAM-5036:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 04/Oct/18 20:08
            Start Date: 04/Oct/18 20:08
    Worklog Time Spent: 10m 
      Work Description: chamikaramj commented on a change in pull request 
#6289: [BEAM-5036] Optimize the FileBasedSink WriteOperation.moveToOutput()
URL: https://github.com/apache/beam/pull/6289#discussion_r222717084
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
 ##########
 @@ -322,8 +330,71 @@ public static void rename(
     if (srcToRename.isEmpty()) {
       return;
     }
-    getFileSystemInternal(srcToRename.iterator().next().getScheme())
-        .rename(srcToRename, destToRename);
+
+    boolean replaceExisting =
+        options.contains(MoveOptions.StandardMoveOptions.REPLACE_EXISTING) ? 
true : false;
+    rename(
+        getFileSystemInternal(srcToRename.iterator().next().getScheme()),
+        srcToRename,
+        destToRename,
+        replaceExisting);
+  }
+
+  /**
+   * Executes a rename of the src which all must exist using the provided 
filesystem.
+   *
+   * <p>If replaceExisting is enabled and filesystem throws {code 
FileAlreadyExistsException} then
+   * an attempt to delete the destination is made and the rename is retried. 
Some filesystem
+   * implementations may apply this automatically without throwing.
+   *
+   * @param fileSystem The filesystem in use
+   * @param srcResourceIds The source resources to move
+   * @param destResourceIds The destinations for the sources to move to (must 
be same length as
+   *     srcResourceIds)
+   * @param replaceExisting If existing files in destination should be 
overwritten
+   * @throws IOException If the rename could not be completed
+   */
+  @VisibleForTesting
+  static void rename(
+      FileSystem fileSystem,
+      List<ResourceId> srcResourceIds,
+      List<ResourceId> destResourceIds,
+      boolean replaceExisting)
+      throws IOException {
+    try {
+      fileSystem.rename(srcResourceIds, destResourceIds);
+    } catch (FileAlreadyExistsException e) {
 
 Review comment:
   According to FileSystem.rename() we should be catching 
java.io.FileNotFoundException instead of 
java.nio.file.FileAlreadyExistsException.
   
   Also, I'm not 100% sure if GCS will throw the proper exception here 
(java.io.FileNotFoundException).
   Unfortunately we cannot properly test this in unittests unless we manually 
introduce failures and connect to GCS service. Can you try executing 
FileSystems.rename() operation for a batch of files to make sure that operation 
will succeed in following cases for GCS (and other file-systems but probably 
you already tested this). Otherwise, this could end up being a major regression 
for Dataflow since bundle re-execution will not operate properly.
   
   (1) FileSystems.rename() with 
MoveOptions.StandardMoveOptions.REPLACE_EXISTING for a set of files where some 
of the files in the destination set are already available.
   (2) FileSystems.rename() with MoveOptions.StandardMoveOptions. 
IGNORE_MISSING_FILES for a set of files where some of the source files are not 
available.
   (3) FileSystems.rename() with both 
MoveOptions.StandardMoveOptions.REPLACE_EXISTING and 
MoveOptions.StandardMoveOptions. IGNORE_MISSING_FILES for a set of files where 
both above conditions are true.
   
   (As before, please let me know if you need help with GCS/Dataflow).
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 151349)
    Time Spent: 10.5h  (was: 10h 20m)

> Optimize FileBasedSink's WriteOperation.moveToOutput()
> ------------------------------------------------------
>
>                 Key: BEAM-5036
>                 URL: https://issues.apache.org/jira/browse/BEAM-5036
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-files
>    Affects Versions: 2.5.0
>            Reporter: Jozef Vilcek
>            Assignee: Tim Robertson
>            Priority: Major
>             Fix For: 2.8.0
>
>          Time Spent: 10.5h
>  Remaining Estimate: 0h
>
> moveToOutput() methods in FileBasedSink.WriteOperation implements move by 
> copy+delete. It would be better to use a rename() which can be much more 
> effective for some filesystems.
> Filesystem must support cross-directory rename. BEAM-4861 is related to this 
> for the case of HDFS filesystem.
> Feature was discussed here:
> http://mail-archives.apache.org/mod_mbox/beam-dev/201807.mbox/%3CCAF9t7_4Mp54pQ+vRrJrBh9Vx0=uaknupzd_qdh_qdm9vxll...@mail.gmail.com%3E



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to