[BEAM-2277] HadoopFileSystem: normalize implementation * Drop empty authority always * Resolve directories
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/15df211c Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/15df211c Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/15df211c Branch: refs/heads/master Commit: 15df211c758e7c8f05c3136f25bbe18e3f394321 Parents: ec956c8 Author: Dan Halperin <[email protected]> Authored: Fri May 12 11:11:06 2017 -0700 Committer: Dan Halperin <[email protected]> Committed: Fri May 12 14:59:10 2017 -0700 ---------------------------------------------------------------------- .../apache/beam/sdk/io/fs/ResourceIdTester.java | 15 +++++---- .../beam/sdk/io/hdfs/HadoopFileSystem.java | 32 +++++++++++--------- .../beam/sdk/io/hdfs/HadoopResourceId.java | 16 +++++++++- .../beam/sdk/io/hdfs/HadoopFileSystemTest.java | 3 +- .../beam/sdk/io/hdfs/HadoopResourceIdTest.java | 22 +++++++++----- 5 files changed, 56 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/15df211c/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/ResourceIdTester.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/ResourceIdTester.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/ResourceIdTester.java index fe50ada..8ceaeed 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/ResourceIdTester.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/ResourceIdTester.java @@ -21,9 +21,8 @@ import static com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY; import static org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions.RESOLVE_FILE; import static org.hamcrest.Matchers.equalTo; -import static org.junit.Assert.assertFalse; +import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import com.google.common.testing.EqualsTester; @@ -66,16 +65,16 @@ public final class ResourceIdTester { ResourceId file2a = baseDirectory.resolve("child2", RESOLVE_FILE); allResourceIds.add(file1); allResourceIds.add(file2); - assertFalse("Resolved file isDirectory()", file1.isDirectory()); - assertFalse("Resolved file isDirectory()", file2.isDirectory()); - assertFalse("Resolved file isDirectory()", file2a.isDirectory()); + assertThat("Resolved file isDirectory()", file1.isDirectory(), is(false)); + assertThat("Resolved file isDirectory()", file2.isDirectory(), is(false)); + assertThat("Resolved file isDirectory()", file2a.isDirectory(), is(false)); ResourceId dir1 = baseDirectory.resolve("child1", RESOLVE_DIRECTORY); ResourceId dir2 = baseDirectory.resolve("child2", RESOLVE_DIRECTORY); ResourceId dir2a = baseDirectory.resolve("child2", RESOLVE_DIRECTORY); - assertTrue("Resolved directory isDirectory()", dir1.isDirectory()); - assertTrue("Resolved directory isDirectory()", dir2.isDirectory()); - assertTrue("Resolved directory isDirectory()", dir2a.isDirectory()); + assertThat("Resolved directory isDirectory()", dir1.isDirectory(), is(true)); + assertThat("Resolved directory isDirectory()", dir2.isDirectory(), is(true)); + assertThat("Resolved directory isDirectory()", dir2a.isDirectory(), is(true)); allResourceIds.add(dir1); allResourceIds.add(dir2); http://git-wip-us.apache.org/repos/asf/beam/blob/15df211c/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java index 154a818..d519a8c 100644 --- a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java +++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java @@ -21,7 +21,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import java.io.IOException; import java.net.URI; -import java.net.URISyntaxException; import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; @@ -82,8 +81,9 @@ class HadoopFileSystem extends FileSystem<HadoopResourceId> { List<Metadata> metadata = new ArrayList<>(); for (FileStatus fileStatus : fileStatuses) { if (fileStatus.isFile()) { + URI uri = dropEmptyAuthority(fileStatus.getPath().toUri().toString()); metadata.add(Metadata.builder() - .setResourceId(new HadoopResourceId(fileStatus.getPath().toUri())) + .setResourceId(new HadoopResourceId(uri)) .setIsReadSeekEfficient(true) .setSizeBytes(fileStatus.getLen()) .build()); @@ -151,19 +151,13 @@ class HadoopFileSystem extends FileSystem<HadoopResourceId> { @Override protected HadoopResourceId matchNewResource(String singleResourceSpec, boolean isDirectory) { - try { - if (singleResourceSpec.endsWith("/") && !isDirectory) { - throw new IllegalArgumentException(String.format( - "Expected file path but received directory path %s", singleResourceSpec)); - } - return !singleResourceSpec.endsWith("/") && isDirectory - ? new HadoopResourceId(new URI(singleResourceSpec + "/")) - : new HadoopResourceId(new URI(singleResourceSpec)); - } catch (URISyntaxException e) { - throw new IllegalArgumentException( - String.format("Invalid spec %s directory %s", singleResourceSpec, isDirectory), - e); + if (singleResourceSpec.endsWith("/") && !isDirectory) { + throw new IllegalArgumentException(String.format( + "Expected file path but received directory path %s", singleResourceSpec)); } + return !singleResourceSpec.endsWith("/") && isDirectory + ? new HadoopResourceId(dropEmptyAuthority(singleResourceSpec + "/")) + : new HadoopResourceId(dropEmptyAuthority(singleResourceSpec)); } @Override @@ -237,4 +231,14 @@ class HadoopFileSystem extends FileSystem<HadoopResourceId> { inputStream.close(); } } + + private static URI dropEmptyAuthority(String uriStr) { + URI uri = URI.create(uriStr); + String prefix = uri.getScheme() + ":///"; + if (uriStr.startsWith(prefix)) { + return URI.create(uri.getScheme() + ":/" + uriStr.substring(prefix.length())); + } else { + return uri; + } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/15df211c/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java index e570864..88fa32a 100644 --- a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java +++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java @@ -17,9 +17,12 @@ */ package org.apache.beam.sdk.io.hdfs; +import static com.google.common.base.Preconditions.checkArgument; + import java.net.URI; import java.util.Objects; import org.apache.beam.sdk.io.fs.ResolveOptions; +import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.hadoop.fs.Path; @@ -35,7 +38,18 @@ class HadoopResourceId implements ResourceId { @Override public ResourceId resolve(String other, ResolveOptions resolveOptions) { - return new HadoopResourceId(uri.resolve(other)); + if (resolveOptions == StandardResolveOptions.RESOLVE_DIRECTORY) { + if (!other.endsWith("/")) { + other += '/'; + } + return new HadoopResourceId(uri.resolve(other)); + } else if (resolveOptions == StandardResolveOptions.RESOLVE_FILE) { + checkArgument(!other.endsWith("/"), "Resolving a file with a directory path: %s", other); + return new HadoopResourceId(uri.resolve(other)); + } else { + throw new UnsupportedOperationException( + String.format("Unexpected StandardResolveOptions %s", resolveOptions)); + } } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/15df211c/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java index cf86c36..14591d8 100644 --- a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java +++ b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java @@ -63,14 +63,13 @@ public class HadoopFileSystemTest { @Rule public TestPipeline p = TestPipeline.create(); @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); @Rule public ExpectedException thrown = ExpectedException.none(); - private Configuration configuration; private MiniDFSCluster hdfsCluster; private URI hdfsClusterBaseUri; private HadoopFileSystem fileSystem; @Before public void setUp() throws Exception { - configuration = new Configuration(); + Configuration configuration = new Configuration(); configuration.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpFolder.getRoot().getAbsolutePath()); MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(configuration); hdfsCluster = builder.build(); http://git-wip-us.apache.org/repos/asf/beam/blob/15df211c/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java index b0d821b..f179132 100644 --- a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java +++ b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java @@ -18,9 +18,11 @@ package org.apache.beam.sdk.io.hdfs; import java.net.URI; +import java.util.Collections; import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.io.fs.ResourceIdTester; -import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.junit.After; @@ -33,21 +35,25 @@ import org.junit.rules.TemporaryFolder; * Tests for {@link HadoopResourceId}. */ public class HadoopResourceIdTest { - private Configuration configuration; + private MiniDFSCluster hdfsCluster; private URI hdfsClusterBaseUri; - private HadoopFileSystem fileSystem; + @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); @Before public void setUp() throws Exception { - configuration = new Configuration(); + Configuration configuration = new Configuration(); configuration.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpFolder.getRoot().getAbsolutePath()); MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(configuration); hdfsCluster = builder.build(); hdfsClusterBaseUri = new URI(configuration.get("fs.defaultFS") + "/"); - fileSystem = new HadoopFileSystem(configuration); + + // Register HadoopFileSystem for this test. + HadoopFileSystemOptions options = PipelineOptionsFactory.as(HadoopFileSystemOptions.class); + options.setHdfsConfiguration(Collections.singletonList(configuration)); + FileSystems.setDefaultConfigInWorkers(options); } @After @@ -57,7 +63,9 @@ public class HadoopResourceIdTest { @Test public void testResourceIdTester() throws Exception { - FileSystems.setDefaultConfigInWorkers(TestPipeline.testingPipelineOptions()); - ResourceIdTester.runResourceIdBattery(new HadoopResourceId(hdfsClusterBaseUri)); + ResourceId baseDirectory = + FileSystems.matchNewResource( + "hdfs://" + hdfsClusterBaseUri.getPath(), true /* isDirectory */); + ResourceIdTester.runResourceIdBattery(baseDirectory); } }
