[ 
https://issues.apache.org/jira/browse/BEAM-5036?focusedWorklogId=160662&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-160662
 ]

ASF GitHub Bot logged work on BEAM-5036:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 30/Oct/18 14:38
            Start Date: 30/Oct/18 14:38
    Worklog Time Spent: 10m 
      Work Description: timrobertson100 closed pull request #6843: [BEAM-5036] 
Optimize the FileBasedSink WriteOperation.moveToOutput()
URL: https://github.com/apache/beam/pull/6843
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 92b2382e365..62d52beec15 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -758,7 +758,7 @@ final void moveToOutputFiles(
       }
       // During a failure case, files may have been deleted in an earlier 
step. Thus
       // we ignore missing files here.
-      FileSystems.copy(srcFiles, dstFiles, 
StandardMoveOptions.IGNORE_MISSING_FILES);
+      FileSystems.rename(srcFiles, dstFiles, 
StandardMoveOptions.IGNORE_MISSING_FILES);
       removeTemporaryFiles(srcFiles);
     }
 
diff --git 
a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
 
b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
index b08a70fc943..a1cee0bbd93 100644
--- 
a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
+++ 
b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
@@ -75,6 +75,7 @@
   private static final Logger LOG = 
LoggerFactory.getLogger(HadoopFileSystem.class);
 
   @VisibleForTesting static final String LOG_CREATE_DIRECTORY = "Creating 
directory %s";
+  @VisibleForTesting static final String LOG_DELETING_EXISTING_FILE = 
"Deleting existing file %s";
   @VisibleForTesting final org.apache.hadoop.fs.FileSystem fileSystem;
 
   HadoopFileSystem(Configuration configuration) throws IOException {
@@ -169,7 +170,8 @@ protected void copy(List<HadoopResourceId> srcResourceIds, 
List<HadoopResourceId
    *     and destination both exist. Thus no data is lost, however, duplicated 
resource are
    *     possible. In such scenarios, callers can use {@code match()} to 
determine the state of the
    *     resource.
-   * @throws FileAlreadyExistsException if the target resources already exist.
+   * @throws FileAlreadyExistsException if a target resource already exists 
and couldn't be
+   *     overwritten.
    * @throws IOException if the underlying filesystem indicates the rename was 
not performed but no
    *     other errors were thrown.
    */
@@ -179,46 +181,61 @@ protected void rename(
       throws IOException {
     for (int i = 0; i < srcResourceIds.size(); ++i) {
 
+      Path src = srcResourceIds.get(i).toPath();
+      Path dest = destResourceIds.get(i).toPath();
+
       // rename in HDFS requires the target directory to exist or silently 
fails (BEAM-4861)
-      Path targetDirectory = destResourceIds.get(i).toPath().getParent();
-      if (!fileSystem.exists(targetDirectory)) {
+      mkdirs(dest);
+
+      boolean success = fileSystem.rename(src, dest);
+
+      // If the failure was due to the file already existing, delete and retry 
(BEAM-5036).
+      // This should be the exceptional case, so handle here rather than incur 
the overhead of testing first
+      if (!success && fileSystem.exists(src) && fileSystem.exists(dest)) {
         LOG.debug(
-            String.format(
-                LOG_CREATE_DIRECTORY, 
Path.getPathWithoutSchemeAndAuthority(targetDirectory)));
-        boolean success = fileSystem.mkdirs(targetDirectory);
-        if (!success) {
-          throw new IOException(
-              String.format(
-                  "Unable to create target directory %s. No further 
information provided by underlying filesystem.",
-                  targetDirectory));
-        }
+            String.format(LOG_DELETING_EXISTING_FILE, 
Path.getPathWithoutSchemeAndAuthority(dest)));
+        fileSystem.delete(dest, false); // not recursive
+        success = fileSystem.rename(src, dest);
       }
 
-      boolean success =
-          fileSystem.rename(srcResourceIds.get(i).toPath(), 
destResourceIds.get(i).toPath());
       if (!success) {
-        if (!fileSystem.exists(srcResourceIds.get(i).toPath())) {
+        if (!fileSystem.exists(src)) {
           throw new FileNotFoundException(
-              String.format(
-                  "Unable to rename resource %s to %s as source not found.",
-                  srcResourceIds.get(i).toPath(), 
destResourceIds.get(i).toPath()));
+              String.format("Unable to rename resource %s to %s as source not 
found.", src, dest));
 
-        } else if (fileSystem.exists(destResourceIds.get(i).toPath())) {
+        } else if (fileSystem.exists(dest)) {
           throw new FileAlreadyExistsException(
               String.format(
-                  "Unable to rename resource %s to %s as destination already 
exists.",
-                  srcResourceIds.get(i).toPath(), 
destResourceIds.get(i).toPath()));
+                  "Unable to rename resource %s to %s as destination already 
exists and couldn't be deleted.",
+                  src, dest));
 
         } else {
           throw new IOException(
               String.format(
                   "Unable to rename resource %s to %s. No further information 
provided by underlying filesystem.",
-                  srcResourceIds.get(i).toPath(), 
destResourceIds.get(i).toPath()));
+                  src, dest));
         }
       }
     }
   }
 
+  /** Ensures that the target directory exists for the given filePath. */
+  private void mkdirs(Path filePath) throws IOException {
+    Path targetDirectory = filePath.getParent();
+    if (!fileSystem.exists(targetDirectory)) {
+      LOG.debug(
+          String.format(
+              LOG_CREATE_DIRECTORY, 
Path.getPathWithoutSchemeAndAuthority(targetDirectory)));
+      boolean success = fileSystem.mkdirs(targetDirectory);
+      if (!success) {
+        throw new IOException(
+            String.format(
+                "Unable to create target directory %s. No further information 
provided by underlying filesystem.",
+                targetDirectory));
+      }
+    }
+  }
+
   @Override
   protected void delete(Collection<HadoopResourceId> resourceIds) throws 
IOException {
     for (HadoopResourceId resourceId : resourceIds) {
diff --git 
a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
 
b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
index e9e9aaab648..ae07af922e7 100644
--- 
a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
+++ 
b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
@@ -36,7 +36,6 @@
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
 import java.nio.charset.StandardCharsets;
-import java.nio.file.FileAlreadyExistsException;
 import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.sdk.io.FileSystems;
@@ -301,7 +300,8 @@ public void testRenameMissingSource() throws Exception {
         ImmutableList.of(testPath("missingFile")), 
ImmutableList.of(testPath("testFileA")));
   }
 
-  @Test(expected = FileAlreadyExistsException.class)
+  /** Test that rename overwrites existing files. */
+  @Test
   public void testRenameExistingDestination() throws Exception {
     create("testFileA", "testDataA".getBytes(StandardCharsets.UTF_8));
     create("testFileB", "testDataB".getBytes(StandardCharsets.UTF_8));
@@ -312,6 +312,10 @@ public void testRenameExistingDestination() throws 
Exception {
 
     fileSystem.rename(
         ImmutableList.of(testPath("testFileA")), 
ImmutableList.of(testPath("testFileB")));
+
+    expectedLogs.verifyDebug(
+        String.format(HadoopFileSystem.LOG_DELETING_EXISTING_FILE, 
"/testFileB"));
+    assertArrayEquals("testDataA".getBytes(StandardCharsets.UTF_8), 
read("testFileB", 0));
   }
 
   /** Test that rename throws predictably when source doesn't exist and 
destination does. */


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 160662)
    Time Spent: 14h 20m  (was: 14h 10m)

> Optimize FileBasedSink's WriteOperation.moveToOutput()
> ------------------------------------------------------
>
>                 Key: BEAM-5036
>                 URL: https://issues.apache.org/jira/browse/BEAM-5036
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-files
>    Affects Versions: 2.5.0
>            Reporter: Jozef Vilcek
>            Assignee: Tim Robertson
>            Priority: Major
>          Time Spent: 14h 20m
>  Remaining Estimate: 0h
>
> moveToOutput() methods in FileBasedSink.WriteOperation implements move by 
> copy+delete. It would be better to use a rename() which can be much more 
> effective for some filesystems.
> Filesystem must support cross-directory rename. BEAM-4861 is related to this 
> for the case of HDFS filesystem.
> Feature was discussed here:
> http://mail-archives.apache.org/mod_mbox/beam-dev/201807.mbox/%3CCAF9t7_4Mp54pQ+vRrJrBh9Vx0=uaknupzd_qdh_qdm9vxll...@mail.gmail.com%3E



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to