[
https://issues.apache.org/jira/browse/BEAM-4861?focusedWorklogId=145691&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-145691
]
ASF GitHub Bot logged work on BEAM-4861:
----------------------------------------
Author: ASF GitHub Bot
Created on: 19/Sep/18 15:06
Start Date: 19/Sep/18 15:06
Worklog Time Spent: 10m
Work Description: iemejia closed pull request #6285: [BEAM-4861]
Autocreate directories when doing an HDFS rename
URL: https://github.com/apache/beam/pull/6285
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
index 8230c699836..b08a70fc943 100644
---
a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
+++
b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
@@ -19,6 +19,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
@@ -26,6 +27,7 @@
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SeekableByteChannel;
import java.nio.channels.WritableByteChannel;
+import java.nio.file.FileAlreadyExistsException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -41,6 +43,8 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Adapts {@link org.apache.hadoop.fs.FileSystem} connectors to be used as
Apache Beam {@link
@@ -68,6 +72,9 @@
* </ul>
*/
class HadoopFileSystem extends FileSystem<HadoopResourceId> {
+ private static final Logger LOG =
LoggerFactory.getLogger(HadoopFileSystem.class);
+
+ @VisibleForTesting static final String LOG_CREATE_DIRECTORY = "Creating
directory %s";
@VisibleForTesting final org.apache.hadoop.fs.FileSystem fileSystem;
HadoopFileSystem(Configuration configuration) throws IOException {
@@ -129,29 +136,93 @@ protected void copy(List<HadoopResourceId>
srcResourceIds, List<HadoopResourceId
// implementing it. The DFSFileSystem implemented concat by deleting the
srcs after which
// is not what we want. Also, all the other FileSystem implementations I
saw threw
// UnsupportedOperationException within concat.
- FileUtil.copy(
- fileSystem,
- srcResourceIds.get(i).toPath(),
- fileSystem,
- destResourceIds.get(i).toPath(),
- false,
- true,
- fileSystem.getConf());
+ boolean success =
+ FileUtil.copy(
+ fileSystem,
+ srcResourceIds.get(i).toPath(),
+ fileSystem,
+ destResourceIds.get(i).toPath(),
+ false,
+ true,
+ fileSystem.getConf());
+ if (!success) {
+ // Defensive coding as this should not happen in practice
+ throw new IOException(
+ String.format(
+ "Unable to copy resource %s to %s. No further information
provided by underlying filesystem.",
+ srcResourceIds.get(i).toPath(),
destResourceIds.get(i).toPath()));
+ }
}
}
+ /**
+ * Renames a {@link List} of file-like resources from one location to
another.
+ *
+ * <p>The number of source resources must equal the number of destination
resources. Destination
+ * resources will be created recursively.
+ *
+ * @param srcResourceIds the references of the source resources
+ * @param destResourceIds the references of the destination resources
+ * @throws FileNotFoundException if the source resources are missing. When
rename throws, the
+ * state of the resources is unknown but safe: for every (source,
destination) pair of
+ * resources, the following are possible: a) source exists, b)
destination exists, c) source
+ * and destination both exist. Thus no data is lost, however, duplicated
resource are
+ * possible. In such scenarios, callers can use {@code match()} to
determine the state of the
+ * resource.
+ * @throws FileAlreadyExistsException if the target resources already exist.
+ * @throws IOException if the underlying filesystem indicates the rename was
not performed but no
+ * other errors were thrown.
+ */
@Override
protected void rename(
List<HadoopResourceId> srcResourceIds, List<HadoopResourceId>
destResourceIds)
throws IOException {
for (int i = 0; i < srcResourceIds.size(); ++i) {
- fileSystem.rename(srcResourceIds.get(i).toPath(),
destResourceIds.get(i).toPath());
+
+ // rename in HDFS requires the target directory to exist or silently
fails (BEAM-4861)
+ Path targetDirectory = destResourceIds.get(i).toPath().getParent();
+ if (!fileSystem.exists(targetDirectory)) {
+ LOG.debug(
+ String.format(
+ LOG_CREATE_DIRECTORY,
Path.getPathWithoutSchemeAndAuthority(targetDirectory)));
+ boolean success = fileSystem.mkdirs(targetDirectory);
+ if (!success) {
+ throw new IOException(
+ String.format(
+ "Unable to create target directory %s. No further
information provided by underlying filesystem.",
+ targetDirectory));
+ }
+ }
+
+ boolean success =
+ fileSystem.rename(srcResourceIds.get(i).toPath(),
destResourceIds.get(i).toPath());
+ if (!success) {
+ if (!fileSystem.exists(srcResourceIds.get(i).toPath())) {
+ throw new FileNotFoundException(
+ String.format(
+ "Unable to rename resource %s to %s as source not found.",
+ srcResourceIds.get(i).toPath(),
destResourceIds.get(i).toPath()));
+
+ } else if (fileSystem.exists(destResourceIds.get(i).toPath())) {
+ throw new FileAlreadyExistsException(
+ String.format(
+ "Unable to rename resource %s to %s as destination already
exists.",
+ srcResourceIds.get(i).toPath(),
destResourceIds.get(i).toPath()));
+
+ } else {
+ throw new IOException(
+ String.format(
+ "Unable to rename resource %s to %s. No further information
provided by underlying filesystem.",
+ srcResourceIds.get(i).toPath(),
destResourceIds.get(i).toPath()));
+ }
+ }
}
}
@Override
protected void delete(Collection<HadoopResourceId> resourceIds) throws
IOException {
for (HadoopResourceId resourceId : resourceIds) {
+ // ignore response as issues are surfaced with exception
fileSystem.delete(resourceId.toPath(), false);
}
}
diff --git
a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
index 5b962ee533b..e9e9aaab648 100644
---
a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
+++
b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
@@ -28,6 +28,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.io.ByteStreams;
+import java.io.FileNotFoundException;
import java.io.InputStream;
import java.net.URI;
import java.nio.ByteBuffer;
@@ -35,6 +36,7 @@
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.StandardCharsets;
+import java.nio.file.FileAlreadyExistsException;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.io.FileSystems;
@@ -43,6 +45,7 @@
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.io.fs.MatchResult.Status;
+import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.util.MimeTypes;
@@ -66,6 +69,7 @@
@Rule public TestPipeline p = TestPipeline.create();
@Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
@Rule public ExpectedException thrown = ExpectedException.none();
+ @Rule public final ExpectedLogs expectedLogs =
ExpectedLogs.none(HadoopFileSystem.class);
private MiniDFSCluster hdfsCluster;
private URI hdfsClusterBaseUri;
private HadoopFileSystem fileSystem;
@@ -124,6 +128,12 @@ public void testCopy() throws Exception {
assertArrayEquals("testDataB".getBytes(StandardCharsets.UTF_8),
read("copyTestFileB", 0));
}
+ @Test(expected = FileNotFoundException.class)
+ public void testCopySourceMissing() throws Exception {
+ fileSystem.copy(
+ ImmutableList.of(testPath("missingFile")),
ImmutableList.of(testPath("copyTestFile")));
+ }
+
@Test
public void testDelete() throws Exception {
create("testFileA", "testDataA".getBytes(StandardCharsets.UTF_8));
@@ -152,6 +162,12 @@ public void testDelete() throws Exception {
.build()))));
}
+ /** Verifies that an attempt to delete a non existing file is silently
ignored. */
+ @Test
+ public void testDeleteNonExisting() throws Exception {
+ fileSystem.delete(ImmutableList.of(testPath("MissingFile")));
+ }
+
@Test
public void testMatch() throws Exception {
create("testFileAA", "testDataAA".getBytes(StandardCharsets.UTF_8));
@@ -255,6 +271,59 @@ public void testRename() throws Exception {
assertArrayEquals("testDataB".getBytes(StandardCharsets.UTF_8),
read("renameFileB", 0));
}
+ /** Ensure that missing parent directories are created when required. */
+ @Test
+ public void testRenameMissingTargetDir() throws Exception {
+ create("pathA/testFileA", "testDataA".getBytes(StandardCharsets.UTF_8));
+ create("pathA/testFileB", "testDataB".getBytes(StandardCharsets.UTF_8));
+
+ // ensure files exist
+ assertArrayEquals("testDataA".getBytes(StandardCharsets.UTF_8),
read("pathA/testFileA", 0));
+ assertArrayEquals("testDataB".getBytes(StandardCharsets.UTF_8),
read("pathA/testFileB", 0));
+
+ // move to a directory that does not exist
+ fileSystem.rename(
+ ImmutableList.of(testPath("pathA/testFileA"),
testPath("pathA/testFileB")),
+ ImmutableList.of(testPath("pathB/testFileA"),
testPath("pathB/pathC/pathD/testFileB")));
+
+ // ensure the directories were created and the files can be read
+
expectedLogs.verifyDebug(String.format(HadoopFileSystem.LOG_CREATE_DIRECTORY,
"/pathB"));
+ expectedLogs.verifyDebug(
+ String.format(HadoopFileSystem.LOG_CREATE_DIRECTORY,
"/pathB/pathC/pathD"));
+ assertArrayEquals("testDataA".getBytes(StandardCharsets.UTF_8),
read("pathB/testFileA", 0));
+ assertArrayEquals(
+ "testDataB".getBytes(StandardCharsets.UTF_8),
read("pathB/pathC/pathD/testFileB", 0));
+ }
+
+ @Test(expected = FileNotFoundException.class)
+ public void testRenameMissingSource() throws Exception {
+ fileSystem.rename(
+ ImmutableList.of(testPath("missingFile")),
ImmutableList.of(testPath("testFileA")));
+ }
+
+ @Test(expected = FileAlreadyExistsException.class)
+ public void testRenameExistingDestination() throws Exception {
+ create("testFileA", "testDataA".getBytes(StandardCharsets.UTF_8));
+ create("testFileB", "testDataB".getBytes(StandardCharsets.UTF_8));
+
+ // ensure files exist
+ assertArrayEquals("testDataA".getBytes(StandardCharsets.UTF_8),
read("testFileA", 0));
+ assertArrayEquals("testDataB".getBytes(StandardCharsets.UTF_8),
read("testFileB", 0));
+
+ fileSystem.rename(
+ ImmutableList.of(testPath("testFileA")),
ImmutableList.of(testPath("testFileB")));
+ }
+
+ /** Test that rename throws predictably when source doesn't exist and
destination does. */
+ @Test(expected = FileNotFoundException.class)
+ public void testRenameRetryScenario() throws Exception {
+ testRename();
+ // retry the knowing that sources are already moved to destination
+ fileSystem.rename(
+ ImmutableList.of(testPath("testFileA"), testPath("testFileB")),
+ ImmutableList.of(testPath("renameFileA"), testPath("renameFileB")));
+ }
+
@Test
public void testMatchNewResource() {
// match file spec
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 145691)
Time Spent: 1h (was: 50m)
> Hadoop Filesystem silently fails
> --------------------------------
>
> Key: BEAM-4861
> URL: https://issues.apache.org/jira/browse/BEAM-4861
> Project: Beam
> Issue Type: Bug
> Components: io-java-hadoop
> Reporter: Jozef Vilcek
> Assignee: Tim Robertson
> Priority: Major
> Time Spent: 1h
> Remaining Estimate: 0h
>
> Hi,
> beam Filesystem operations copy, rename and delete are void in SDK. Hadoop
> native filesystem operations are not and returns void. Current implementation
> in Beam ignores the result and pass as long as exception is not thrown.
> I got burned by this when using 'rename' to do a 'move' operation on HDFS. If
> target directory does not exists, operations returns false and do not touch
> the file.
> [https://github.com/apache/beam/blob/master/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java#L148]
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)