Repository: beam Updated Branches: refs/heads/master 7fa00647d -> a3ba8e71b
[BEAM-2005, BEAM-2030, BEAM-2031, BEAM-2032, BEAM-2033, BEAM-2070] Base implementation of HadoopFileSystem. TODO: * Add multiplexing FileSystem that is able to route requests based upon the base URI when configured for multiple file systems. * Take a look at copy/rename again to see if we can do better than moving all the bytes through the local machine. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/23d16f7d Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/23d16f7d Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/23d16f7d Branch: refs/heads/master Commit: 23d16f7deb8130457d7fadab498cebc3c994ad76 Parents: 7fa0064 Author: Luke Cwik <[email protected]> Authored: Fri Apr 28 19:07:59 2017 -0700 Committer: Luke Cwik <[email protected]> Committed: Mon May 1 21:33:16 2017 -0700 ---------------------------------------------------------------------- sdks/java/io/hdfs/pom.xml | 44 ++++ .../beam/sdk/io/hdfs/HadoopFileSystem.java | 179 +++++++++++++- .../sdk/io/hdfs/HadoopFileSystemRegistrar.java | 27 +- .../beam/sdk/io/hdfs/HadoopResourceId.java | 46 +++- .../io/hdfs/HadoopFileSystemRegistrarTest.java | 39 ++- .../beam/sdk/io/hdfs/HadoopFileSystemTest.java | 244 +++++++++++++++++++ 6 files changed, 555 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/23d16f7d/sdks/java/io/hdfs/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/pom.xml b/sdks/java/io/hdfs/pom.xml index 46cf8cf..daa3b26 100644 --- a/sdks/java/io/hdfs/pom.xml +++ b/sdks/java/io/hdfs/pom.xml @@ -44,6 +44,37 @@ </plugins> </build> + <properties> + <!-- + This is the version of Hadoop used to compile the hadoop-common module. + This dependency is defined with a provided scope. + Users must supply their own Hadoop version at runtime. + --> + <hadoop.version>2.7.3</hadoop.version> + </properties> + + <dependencyManagement> + <!-- + We define dependencies here instead of sdks/java/io because + of a version mimatch between this Hadoop version and other + Hadoop versions declared in other io submodules. + --> + <dependencies> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <classifier>tests</classifier> + <version>${hadoop.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-minicluster</artifactId> + <version>${hadoop.version}</version> + </dependency> + </dependencies> + </dependencyManagement> + <dependencies> <dependency> <groupId>org.apache.beam</groupId> @@ -131,6 +162,19 @@ <!-- test dependencies --> <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-minicluster</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <classifier>tests</classifier> + <scope>test</scope> + </dependency> + + <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-direct-java</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/beam/blob/23d16f7d/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java index a8bdd44..154a818 100644 --- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java +++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java @@ -17,65 +17,224 @@ */ package org.apache.beam.sdk.io.hdfs; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; +import java.nio.channels.SeekableByteChannel; import java.nio.channels.WritableByteChannel; +import java.util.ArrayList; import java.util.Collection; import java.util.List; import org.apache.beam.sdk.io.FileSystem; import org.apache.beam.sdk.io.fs.CreateOptions; import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.fs.MatchResult.Metadata; +import org.apache.beam.sdk.io.fs.MatchResult.Status; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; /** * Adapts {@link org.apache.hadoop.fs.FileSystem} connectors to be used as * Apache Beam {@link FileSystem FileSystems}. + * + * <p>The following HDFS FileSystem(s) are known to be unsupported: + * <ul> + * <li>FTPFileSystem: Missing seek support within FTPInputStream</li> + * </ul> + * + * <p>This implementation assumes that the underlying Hadoop {@link FileSystem} is seek + * efficient when reading. The source code for the following {@link FSInputStream} implementations + * (as of Hadoop 2.7.1) do provide seek implementations: + * <ul> + * <li>HarFsInputStream</li> + * <li>S3InputStream</li> + * <li>DFSInputStream</li> + * <li>SwiftNativeInputStream</li> + * <li>NativeS3FsInputStream</li> + * <li>LocalFSFileInputStream</li> + * <li>NativeAzureFsInputStream</li> + * <li>S3AInputStream</li> + * </ul> */ class HadoopFileSystem extends FileSystem<HadoopResourceId> { + @VisibleForTesting + final org.apache.hadoop.fs.FileSystem fileSystem; - HadoopFileSystem() {} + HadoopFileSystem(Configuration configuration) throws IOException { + this.fileSystem = org.apache.hadoop.fs.FileSystem.newInstance(configuration); + } @Override - protected List<MatchResult> match(List<String> specs) throws IOException { - throw new UnsupportedOperationException(); + protected List<MatchResult> match(List<String> specs) { + ImmutableList.Builder<MatchResult> resultsBuilder = ImmutableList.builder(); + for (String spec : specs) { + try { + FileStatus[] fileStatuses = fileSystem.globStatus(new Path(spec)); + List<Metadata> metadata = new ArrayList<>(); + for (FileStatus fileStatus : fileStatuses) { + if (fileStatus.isFile()) { + metadata.add(Metadata.builder() + .setResourceId(new HadoopResourceId(fileStatus.getPath().toUri())) + .setIsReadSeekEfficient(true) + .setSizeBytes(fileStatus.getLen()) + .build()); + } + } + resultsBuilder.add(MatchResult.create(Status.OK, metadata)); + } catch (IOException e) { + resultsBuilder.add(MatchResult.create(Status.ERROR, e)); + } + } + return resultsBuilder.build(); } @Override protected WritableByteChannel create(HadoopResourceId resourceId, CreateOptions createOptions) throws IOException { - throw new UnsupportedOperationException(); + return Channels.newChannel(fileSystem.create(resourceId.toPath())); } @Override protected ReadableByteChannel open(HadoopResourceId resourceId) throws IOException { - throw new UnsupportedOperationException(); + FileStatus fileStatus = fileSystem.getFileStatus(resourceId.toPath()); + return new HadoopSeekableByteChannel(fileStatus, fileSystem.open(resourceId.toPath())); } @Override protected void copy( List<HadoopResourceId> srcResourceIds, List<HadoopResourceId> destResourceIds) throws IOException { - throw new UnsupportedOperationException(); + for (int i = 0; i < srcResourceIds.size(); ++i) { + // Unfortunately HDFS FileSystems don't support a native copy operation so we are forced + // to use the inefficient implementation found in FileUtil which copies all the bytes through + // the local machine. + // + // HDFS FileSystem does define a concat method but could only find the DFSFileSystem + // implementing it. The DFSFileSystem implemented concat by deleting the srcs after which + // is not what we want. Also, all the other FileSystem implementations I saw threw + // UnsupportedOperationException within concat. + FileUtil.copy( + fileSystem, srcResourceIds.get(i).toPath(), + fileSystem, destResourceIds.get(i).toPath(), + false, + true, + fileSystem.getConf()); + } } @Override protected void rename( List<HadoopResourceId> srcResourceIds, List<HadoopResourceId> destResourceIds) throws IOException { - throw new UnsupportedOperationException(); + for (int i = 0; i < srcResourceIds.size(); ++i) { + fileSystem.rename( + srcResourceIds.get(i).toPath(), + destResourceIds.get(i).toPath()); + } } @Override protected void delete(Collection<HadoopResourceId> resourceIds) throws IOException { - throw new UnsupportedOperationException(); + for (HadoopResourceId resourceId : resourceIds) { + fileSystem.delete(resourceId.toPath(), false); + } } @Override protected HadoopResourceId matchNewResource(String singleResourceSpec, boolean isDirectory) { - throw new UnsupportedOperationException(); + try { + if (singleResourceSpec.endsWith("/") && !isDirectory) { + throw new IllegalArgumentException(String.format( + "Expected file path but received directory path %s", singleResourceSpec)); + } + return !singleResourceSpec.endsWith("/") && isDirectory + ? new HadoopResourceId(new URI(singleResourceSpec + "/")) + : new HadoopResourceId(new URI(singleResourceSpec)); + } catch (URISyntaxException e) { + throw new IllegalArgumentException( + String.format("Invalid spec %s directory %s", singleResourceSpec, isDirectory), + e); + } } @Override protected String getScheme() { - return "hdfs"; + return fileSystem.getScheme(); + } + + /** An adapter around {@link FSDataInputStream} that implements {@link SeekableByteChannel}. */ + private static class HadoopSeekableByteChannel implements SeekableByteChannel { + private final FileStatus fileStatus; + private final FSDataInputStream inputStream; + private boolean closed; + + private HadoopSeekableByteChannel(FileStatus fileStatus, FSDataInputStream inputStream) { + this.fileStatus = fileStatus; + this.inputStream = inputStream; + this.closed = false; + } + + @Override + public int read(ByteBuffer dst) throws IOException { + if (closed) { + throw new IOException("Channel is closed"); + } + return inputStream.read(dst); + } + + @Override + public int write(ByteBuffer src) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public long position() throws IOException { + if (closed) { + throw new IOException("Channel is closed"); + } + return inputStream.getPos(); + } + + @Override + public SeekableByteChannel position(long newPosition) throws IOException { + if (closed) { + throw new IOException("Channel is closed"); + } + inputStream.seek(newPosition); + return this; + } + + @Override + public long size() throws IOException { + if (closed) { + throw new IOException("Channel is closed"); + } + return fileStatus.getLen(); + } + + @Override + public SeekableByteChannel truncate(long size) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isOpen() { + return !closed; + } + + @Override + public void close() throws IOException { + closed = true; + inputStream.close(); + } } } http://git-wip-us.apache.org/repos/asf/beam/blob/23d16f7d/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java index cc22f4f..9159df3 100644 --- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java +++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java @@ -17,12 +17,18 @@ */ package org.apache.beam.sdk.io.hdfs; +import static com.google.common.base.Preconditions.checkArgument; + import com.google.auto.service.AutoService; import com.google.common.collect.ImmutableList; +import java.io.IOException; +import java.util.Collections; +import java.util.List; import javax.annotation.Nonnull; import org.apache.beam.sdk.io.FileSystem; import org.apache.beam.sdk.io.FileSystemRegistrar; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.hadoop.conf.Configuration; /** * {@link AutoService} registrar for the {@link HadoopFileSystem}. @@ -32,6 +38,25 @@ public class HadoopFileSystemRegistrar implements FileSystemRegistrar { @Override public Iterable<FileSystem> fromOptions(@Nonnull PipelineOptions options) { - return ImmutableList.<FileSystem>of(new HadoopFileSystem()); + List<Configuration> configurations = + options.as(HadoopFileSystemOptions.class).getHdfsConfiguration(); + if (configurations == null) { + configurations = Collections.emptyList(); + } + checkArgument(configurations.size() <= 1, + String.format( + "The %s currently only supports at most a single Hadoop configuration.", + HadoopFileSystemRegistrar.class.getSimpleName())); + + ImmutableList.Builder<FileSystem> builder = ImmutableList.builder(); + for (Configuration configuration : configurations) { + try { + builder.add(new HadoopFileSystem(configuration)); + } catch (IOException e) { + throw new IllegalArgumentException(String.format( + "Failed to construct Hadoop filesystem with configuration %s", configuration), e); + } + } + return builder.build(); } } http://git-wip-us.apache.org/repos/asf/beam/blob/23d16f7d/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java index 8e0b58c..e570864 100644 --- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java +++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java @@ -17,35 +17,65 @@ */ package org.apache.beam.sdk.io.hdfs; +import java.net.URI; +import java.util.Objects; import org.apache.beam.sdk.io.fs.ResolveOptions; import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.hadoop.fs.Path; /** * {@link ResourceId} implementation for the {@link HadoopFileSystem}. */ -public class HadoopResourceId implements ResourceId { +class HadoopResourceId implements ResourceId { + private final URI uri; + + HadoopResourceId(URI uri) { + this.uri = uri; + } @Override public ResourceId resolve(String other, ResolveOptions resolveOptions) { - throw new UnsupportedOperationException(); + return new HadoopResourceId(uri.resolve(other)); } @Override public ResourceId getCurrentDirectory() { - throw new UnsupportedOperationException(); + return new HadoopResourceId(uri.getPath().endsWith("/") ? uri : uri.resolve(".")); + } + + public boolean isDirectory() { + return uri.getPath().endsWith("/"); + } + + @Override + public String getFilename() { + return new Path(uri).getName(); } @Override public String getScheme() { - throw new UnsupportedOperationException(); + return uri.getScheme(); } @Override - public String getFilename() { - throw new UnsupportedOperationException(); + public String toString() { + return uri.toString(); } - public boolean isDirectory() { - throw new UnsupportedOperationException(); + @Override + public boolean equals(Object obj) { + if (!(obj instanceof HadoopResourceId)) { + return false; + } + return Objects.equals(uri, ((HadoopResourceId) obj).uri); + } + + @Override + public int hashCode() { + return Objects.hashCode(uri); + } + + Path toPath() { + return new Path(uri); } } http://git-wip-us.apache.org/repos/asf/beam/blob/23d16f7d/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java index c332af5..96f7102 100644 --- a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java +++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java @@ -17,17 +17,24 @@ */ package org.apache.beam.sdk.io.hdfs; -import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.instanceOf; -import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import java.net.URI; import java.util.ServiceLoader; import org.apache.beam.sdk.io.FileSystem; import org.apache.beam.sdk.io.FileSystemRegistrar; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -37,13 +44,35 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) public class HadoopFileSystemRegistrarTest { + @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); + private Configuration configuration; + private MiniDFSCluster hdfsCluster; + private URI hdfsClusterBaseUri; + + @Before + public void setUp() throws Exception { + configuration = new Configuration(); + configuration.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpFolder.getRoot().getAbsolutePath()); + MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(configuration); + hdfsCluster = builder.build(); + hdfsClusterBaseUri = new URI(configuration.get("fs.defaultFS") + "/"); + } + + @After + public void tearDown() throws Exception { + hdfsCluster.shutdown(); + } + @Test public void testServiceLoader() { + HadoopFileSystemOptions options = PipelineOptionsFactory.as(HadoopFileSystemOptions.class); + options.setHdfsConfiguration(ImmutableList.of(configuration)); for (FileSystemRegistrar registrar : Lists.newArrayList(ServiceLoader.load(FileSystemRegistrar.class).iterator())) { if (registrar instanceof HadoopFileSystemRegistrar) { - Iterable<FileSystem> fileSystems = registrar.fromOptions(PipelineOptionsFactory.create()); - assertThat(fileSystems, contains(instanceOf(HadoopFileSystem.class))); + Iterable<FileSystem> fileSystems = registrar.fromOptions(options); + assertEquals(hdfsClusterBaseUri.getScheme(), + ((HadoopFileSystem) Iterables.getOnlyElement(fileSystems)).getScheme()); return; } } http://git-wip-us.apache.org/repos/asf/beam/blob/23d16f7d/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java new file mode 100644 index 0000000..6cb5326 --- /dev/null +++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hdfs; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.io.ByteStreams; +import java.net.URI; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; +import java.util.List; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.io.fs.CreateOptions.StandardCreateOptions; +import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.fs.MatchResult.Metadata; +import org.apache.beam.sdk.io.fs.MatchResult.Status; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.util.MimeTypes; +import org.apache.beam.sdk.values.PCollection; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link HadoopFileSystem}. + */ +@RunWith(JUnit4.class) +public class HadoopFileSystemTest { + + @Rule public TestPipeline p = TestPipeline.create(); + @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); + @Rule public ExpectedException thrown = ExpectedException.none(); + private Configuration configuration; + private MiniDFSCluster hdfsCluster; + private URI hdfsClusterBaseUri; + private HadoopFileSystem fileSystem; + + @Before + public void setUp() throws Exception { + configuration = new Configuration(); + configuration.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpFolder.getRoot().getAbsolutePath()); + MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(configuration); + hdfsCluster = builder.build(); + hdfsClusterBaseUri = new URI(configuration.get("fs.defaultFS") + "/"); + fileSystem = new HadoopFileSystem(configuration); + } + + @After + public void tearDown() throws Exception { + hdfsCluster.shutdown(); + } + + @Test + public void testCreateAndReadFile() throws Exception { + create("testFile", "testData".getBytes()); + assertArrayEquals("testData".getBytes(), read("testFile")); + } + + @Test + public void testCopy() throws Exception { + create("testFileA", "testDataA".getBytes()); + create("testFileB", "testDataB".getBytes()); + fileSystem.copy( + ImmutableList.of( + testPath("testFileA"), + testPath("testFileB")), + ImmutableList.of( + testPath("copyTestFileA"), + testPath("copyTestFileB"))); + assertArrayEquals("testDataA".getBytes(), read("testFileA")); + assertArrayEquals("testDataB".getBytes(), read("testFileB")); + assertArrayEquals("testDataA".getBytes(), read("copyTestFileA")); + assertArrayEquals("testDataB".getBytes(), read("copyTestFileB")); + } + + @Test + public void testDelete() throws Exception { + create("testFileA", "testDataA".getBytes()); + create("testFileB", "testDataB".getBytes()); + create("testFileC", "testDataC".getBytes()); + + // ensure files exist + assertArrayEquals("testDataA".getBytes(), read("testFileA")); + assertArrayEquals("testDataB".getBytes(), read("testFileB")); + assertArrayEquals("testDataC".getBytes(), read("testFileC")); + + fileSystem.delete(ImmutableList.of( + testPath("testFileA"), + testPath("testFileC"))); + + List<MatchResult> results = + fileSystem.match(ImmutableList.of(testPath("testFile*").toString())); + assertThat(results, contains(MatchResult.create(Status.OK, ImmutableList.of( + Metadata.builder() + .setResourceId(testPath("testFileB")) + .setIsReadSeekEfficient(true) + .setSizeBytes("testDataB".getBytes().length) + .build())))); + } + + @Test + public void testMatch() throws Exception { + create("testFileAA", "testDataAA".getBytes()); + create("testFileA", "testDataA".getBytes()); + create("testFileB", "testDataB".getBytes()); + + // ensure files exist + assertArrayEquals("testDataAA".getBytes(), read("testFileAA")); + assertArrayEquals("testDataA".getBytes(), read("testFileA")); + assertArrayEquals("testDataB".getBytes(), read("testFileB")); + + List<MatchResult> results = + fileSystem.match(ImmutableList.of(testPath("testFileA*").toString())); + assertEquals(Status.OK, Iterables.getOnlyElement(results).status()); + assertThat(Iterables.getOnlyElement(results).metadata(), containsInAnyOrder( + Metadata.builder() + .setResourceId(testPath("testFileAA")) + .setIsReadSeekEfficient(true) + .setSizeBytes("testDataAA".getBytes().length) + .build(), + Metadata.builder() + .setResourceId(testPath("testFileA")) + .setIsReadSeekEfficient(true) + .setSizeBytes("testDataA".getBytes().length) + .build())); + } + + @Test + public void testRename() throws Exception { + create("testFileA", "testDataA".getBytes()); + create("testFileB", "testDataB".getBytes()); + + // ensure files exist + assertArrayEquals("testDataA".getBytes(), read("testFileA")); + assertArrayEquals("testDataB".getBytes(), read("testFileB")); + + fileSystem.rename( + ImmutableList.of( + testPath("testFileA"), testPath("testFileB")), + ImmutableList.of( + testPath("renameFileA"), testPath("renameFileB"))); + + List<MatchResult> results = + fileSystem.match(ImmutableList.of(testPath("*").toString())); + assertEquals(Status.OK, Iterables.getOnlyElement(results).status()); + assertThat(Iterables.getOnlyElement(results).metadata(), containsInAnyOrder( + Metadata.builder() + .setResourceId(testPath("renameFileA")) + .setIsReadSeekEfficient(true) + .setSizeBytes("testDataA".getBytes().length) + .build(), + Metadata.builder() + .setResourceId(testPath("renameFileB")) + .setIsReadSeekEfficient(true) + .setSizeBytes("testDataB".getBytes().length) + .build())); + + // ensure files exist + assertArrayEquals("testDataA".getBytes(), read("renameFileA")); + assertArrayEquals("testDataB".getBytes(), read("renameFileB")); + } + + @Test + public void testMatchNewResource() throws Exception { + // match file spec + assertEquals(testPath("file"), + fileSystem.matchNewResource(testPath("file").toString(), false)); + // match dir spec missing '/' + assertEquals(testPath("dir/"), + fileSystem.matchNewResource(testPath("dir").toString(), true)); + // match dir spec with '/' + assertEquals(testPath("dir/"), + fileSystem.matchNewResource(testPath("dir/").toString(), true)); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Expected file path but received directory path"); + fileSystem.matchNewResource(testPath("dir/").toString(), false); + } + + @Test + public void testReadPipeline() throws Exception { + create("testFileA", "testDataA".getBytes()); + create("testFileB", "testDataB".getBytes()); + create("testFileC", "testDataC".getBytes()); + + HadoopFileSystemOptions options = TestPipeline.testingPipelineOptions() + .as(HadoopFileSystemOptions.class); + options.setHdfsConfiguration(ImmutableList.of(fileSystem.fileSystem.getConf())); + FileSystems.setDefaultConfigInWorkers(options); + PCollection<String> pc = p.apply(TextIO.Read.from(testPath("testFile*").toString())); + PAssert.that(pc).containsInAnyOrder("testDataA", "testDataB", "testDataC"); + p.run(); + } + + private void create(String relativePath, byte[] contents) throws Exception { + try (WritableByteChannel channel = fileSystem.create( + testPath(relativePath), + StandardCreateOptions.builder().setMimeType(MimeTypes.BINARY).build())) { + channel.write(ByteBuffer.wrap(contents)); + } + } + + private byte[] read(String relativePath) throws Exception { + try (ReadableByteChannel channel = fileSystem.open(testPath(relativePath))) { + return ByteStreams.toByteArray(Channels.newInputStream(channel)); + } + } + + private HadoopResourceId testPath(String relativePath) { + return new HadoopResourceId(hdfsClusterBaseUri.resolve(relativePath)); + } +}
