[ 
https://issues.apache.org/jira/browse/BEAM-4861?focusedWorklogId=145691&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-145691
 ]

ASF GitHub Bot logged work on BEAM-4861:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 19/Sep/18 15:06
            Start Date: 19/Sep/18 15:06
    Worklog Time Spent: 10m 
      Work Description: iemejia closed pull request #6285: [BEAM-4861] 
Autocreate directories when doing an HDFS rename
URL: https://github.com/apache/beam/pull/6285
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
 
b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
index 8230c699836..b08a70fc943 100644
--- 
a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
+++ 
b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
@@ -19,6 +19,7 @@
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
 import java.nio.ByteBuffer;
@@ -26,6 +27,7 @@
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.SeekableByteChannel;
 import java.nio.channels.WritableByteChannel;
+import java.nio.file.FileAlreadyExistsException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -41,6 +43,8 @@
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Adapts {@link org.apache.hadoop.fs.FileSystem} connectors to be used as 
Apache Beam {@link
@@ -68,6 +72,9 @@
  * </ul>
  */
 class HadoopFileSystem extends FileSystem<HadoopResourceId> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HadoopFileSystem.class);
+
+  @VisibleForTesting static final String LOG_CREATE_DIRECTORY = "Creating 
directory %s";
   @VisibleForTesting final org.apache.hadoop.fs.FileSystem fileSystem;
 
   HadoopFileSystem(Configuration configuration) throws IOException {
@@ -129,29 +136,93 @@ protected void copy(List<HadoopResourceId> 
srcResourceIds, List<HadoopResourceId
       // implementing it. The DFSFileSystem implemented concat by deleting the 
srcs after which
       // is not what we want. Also, all the other FileSystem implementations I 
saw threw
       // UnsupportedOperationException within concat.
-      FileUtil.copy(
-          fileSystem,
-          srcResourceIds.get(i).toPath(),
-          fileSystem,
-          destResourceIds.get(i).toPath(),
-          false,
-          true,
-          fileSystem.getConf());
+      boolean success =
+          FileUtil.copy(
+              fileSystem,
+              srcResourceIds.get(i).toPath(),
+              fileSystem,
+              destResourceIds.get(i).toPath(),
+              false,
+              true,
+              fileSystem.getConf());
+      if (!success) {
+        // Defensive coding as this should not happen in practice
+        throw new IOException(
+            String.format(
+                "Unable to copy resource %s to %s. No further information 
provided by underlying filesystem.",
+                srcResourceIds.get(i).toPath(), 
destResourceIds.get(i).toPath()));
+      }
     }
   }
 
+  /**
+   * Renames a {@link List} of file-like resources from one location to 
another.
+   *
+   * <p>The number of source resources must equal the number of destination 
resources. Destination
+   * resources will be created recursively.
+   *
+   * @param srcResourceIds the references of the source resources
+   * @param destResourceIds the references of the destination resources
+   * @throws FileNotFoundException if the source resources are missing. When 
rename throws, the
+   *     state of the resources is unknown but safe: for every (source, 
destination) pair of
+   *     resources, the following are possible: a) source exists, b) 
destination exists, c) source
+   *     and destination both exist. Thus no data is lost, however, duplicated 
resource are
+   *     possible. In such scenarios, callers can use {@code match()} to 
determine the state of the
+   *     resource.
+   * @throws FileAlreadyExistsException if the target resources already exist.
+   * @throws IOException if the underlying filesystem indicates the rename was 
not performed but no
+   *     other errors were thrown.
+   */
   @Override
   protected void rename(
       List<HadoopResourceId> srcResourceIds, List<HadoopResourceId> 
destResourceIds)
       throws IOException {
     for (int i = 0; i < srcResourceIds.size(); ++i) {
-      fileSystem.rename(srcResourceIds.get(i).toPath(), 
destResourceIds.get(i).toPath());
+
+      // rename in HDFS requires the target directory to exist or silently 
fails (BEAM-4861)
+      Path targetDirectory = destResourceIds.get(i).toPath().getParent();
+      if (!fileSystem.exists(targetDirectory)) {
+        LOG.debug(
+            String.format(
+                LOG_CREATE_DIRECTORY, 
Path.getPathWithoutSchemeAndAuthority(targetDirectory)));
+        boolean success = fileSystem.mkdirs(targetDirectory);
+        if (!success) {
+          throw new IOException(
+              String.format(
+                  "Unable to create target directory %s. No further 
information provided by underlying filesystem.",
+                  targetDirectory));
+        }
+      }
+
+      boolean success =
+          fileSystem.rename(srcResourceIds.get(i).toPath(), 
destResourceIds.get(i).toPath());
+      if (!success) {
+        if (!fileSystem.exists(srcResourceIds.get(i).toPath())) {
+          throw new FileNotFoundException(
+              String.format(
+                  "Unable to rename resource %s to %s as source not found.",
+                  srcResourceIds.get(i).toPath(), 
destResourceIds.get(i).toPath()));
+
+        } else if (fileSystem.exists(destResourceIds.get(i).toPath())) {
+          throw new FileAlreadyExistsException(
+              String.format(
+                  "Unable to rename resource %s to %s as destination already 
exists.",
+                  srcResourceIds.get(i).toPath(), 
destResourceIds.get(i).toPath()));
+
+        } else {
+          throw new IOException(
+              String.format(
+                  "Unable to rename resource %s to %s. No further information 
provided by underlying filesystem.",
+                  srcResourceIds.get(i).toPath(), 
destResourceIds.get(i).toPath()));
+        }
+      }
     }
   }
 
   @Override
   protected void delete(Collection<HadoopResourceId> resourceIds) throws 
IOException {
     for (HadoopResourceId resourceId : resourceIds) {
+      // ignore response as issues are surfaced with exception
       fileSystem.delete(resourceId.toPath(), false);
     }
   }
diff --git 
a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
 
b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
index 5b962ee533b..e9e9aaab648 100644
--- 
a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
+++ 
b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
@@ -28,6 +28,7 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.io.ByteStreams;
+import java.io.FileNotFoundException;
 import java.io.InputStream;
 import java.net.URI;
 import java.nio.ByteBuffer;
@@ -35,6 +36,7 @@
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.FileAlreadyExistsException;
 import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.sdk.io.FileSystems;
@@ -43,6 +45,7 @@
 import org.apache.beam.sdk.io.fs.MatchResult;
 import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.apache.beam.sdk.io.fs.MatchResult.Status;
+import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.util.MimeTypes;
@@ -66,6 +69,7 @@
   @Rule public TestPipeline p = TestPipeline.create();
   @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
   @Rule public ExpectedException thrown = ExpectedException.none();
+  @Rule public final ExpectedLogs expectedLogs = 
ExpectedLogs.none(HadoopFileSystem.class);
   private MiniDFSCluster hdfsCluster;
   private URI hdfsClusterBaseUri;
   private HadoopFileSystem fileSystem;
@@ -124,6 +128,12 @@ public void testCopy() throws Exception {
     assertArrayEquals("testDataB".getBytes(StandardCharsets.UTF_8), 
read("copyTestFileB", 0));
   }
 
+  @Test(expected = FileNotFoundException.class)
+  public void testCopySourceMissing() throws Exception {
+    fileSystem.copy(
+        ImmutableList.of(testPath("missingFile")), 
ImmutableList.of(testPath("copyTestFile")));
+  }
+
   @Test
   public void testDelete() throws Exception {
     create("testFileA", "testDataA".getBytes(StandardCharsets.UTF_8));
@@ -152,6 +162,12 @@ public void testDelete() throws Exception {
                         .build()))));
   }
 
+  /** Verifies that an attempt to delete a non existing file is silently 
ignored. */
+  @Test
+  public void testDeleteNonExisting() throws Exception {
+    fileSystem.delete(ImmutableList.of(testPath("MissingFile")));
+  }
+
   @Test
   public void testMatch() throws Exception {
     create("testFileAA", "testDataAA".getBytes(StandardCharsets.UTF_8));
@@ -255,6 +271,59 @@ public void testRename() throws Exception {
     assertArrayEquals("testDataB".getBytes(StandardCharsets.UTF_8), 
read("renameFileB", 0));
   }
 
+  /** Ensure that missing parent directories are created when required. */
+  @Test
+  public void testRenameMissingTargetDir() throws Exception {
+    create("pathA/testFileA", "testDataA".getBytes(StandardCharsets.UTF_8));
+    create("pathA/testFileB", "testDataB".getBytes(StandardCharsets.UTF_8));
+
+    // ensure files exist
+    assertArrayEquals("testDataA".getBytes(StandardCharsets.UTF_8), 
read("pathA/testFileA", 0));
+    assertArrayEquals("testDataB".getBytes(StandardCharsets.UTF_8), 
read("pathA/testFileB", 0));
+
+    // move to a directory that does not exist
+    fileSystem.rename(
+        ImmutableList.of(testPath("pathA/testFileA"), 
testPath("pathA/testFileB")),
+        ImmutableList.of(testPath("pathB/testFileA"), 
testPath("pathB/pathC/pathD/testFileB")));
+
+    // ensure the directories were created and the files can be read
+    
expectedLogs.verifyDebug(String.format(HadoopFileSystem.LOG_CREATE_DIRECTORY, 
"/pathB"));
+    expectedLogs.verifyDebug(
+        String.format(HadoopFileSystem.LOG_CREATE_DIRECTORY, 
"/pathB/pathC/pathD"));
+    assertArrayEquals("testDataA".getBytes(StandardCharsets.UTF_8), 
read("pathB/testFileA", 0));
+    assertArrayEquals(
+        "testDataB".getBytes(StandardCharsets.UTF_8), 
read("pathB/pathC/pathD/testFileB", 0));
+  }
+
+  @Test(expected = FileNotFoundException.class)
+  public void testRenameMissingSource() throws Exception {
+    fileSystem.rename(
+        ImmutableList.of(testPath("missingFile")), 
ImmutableList.of(testPath("testFileA")));
+  }
+
+  @Test(expected = FileAlreadyExistsException.class)
+  public void testRenameExistingDestination() throws Exception {
+    create("testFileA", "testDataA".getBytes(StandardCharsets.UTF_8));
+    create("testFileB", "testDataB".getBytes(StandardCharsets.UTF_8));
+
+    // ensure files exist
+    assertArrayEquals("testDataA".getBytes(StandardCharsets.UTF_8), 
read("testFileA", 0));
+    assertArrayEquals("testDataB".getBytes(StandardCharsets.UTF_8), 
read("testFileB", 0));
+
+    fileSystem.rename(
+        ImmutableList.of(testPath("testFileA")), 
ImmutableList.of(testPath("testFileB")));
+  }
+
+  /** Test that rename throws predictably when source doesn't exist and 
destination does. */
+  @Test(expected = FileNotFoundException.class)
+  public void testRenameRetryScenario() throws Exception {
+    testRename();
+    // retry the knowing that sources are already moved to destination
+    fileSystem.rename(
+        ImmutableList.of(testPath("testFileA"), testPath("testFileB")),
+        ImmutableList.of(testPath("renameFileA"), testPath("renameFileB")));
+  }
+
   @Test
   public void testMatchNewResource() {
     // match file spec


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 145691)
    Time Spent: 1h  (was: 50m)

> Hadoop Filesystem silently fails
> --------------------------------
>
>                 Key: BEAM-4861
>                 URL: https://issues.apache.org/jira/browse/BEAM-4861
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-hadoop
>            Reporter: Jozef Vilcek
>            Assignee: Tim Robertson
>            Priority: Major
>          Time Spent: 1h
>  Remaining Estimate: 0h
>
> Hi,
> beam Filesystem operations copy, rename and delete are void in SDK. Hadoop 
> native filesystem operations are not and returns void. Current implementation 
> in Beam ignores the result and pass as long as exception is not thrown.
> I got burned by this when using 'rename' to do a 'move' operation on HDFS. If 
> target directory does not exists, operations returns false and do not touch 
> the file.
> [https://github.com/apache/beam/blob/master/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java#L148]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to