[BEAM-2135] Move hdfs to hadoop-file-system
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bacd33c8 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bacd33c8 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bacd33c8 Branch: refs/heads/master Commit: bacd33c81d99f4a3d1b11eb391a7f790087fc2a1 Parents: 3161904 Author: Luke Cwik <[email protected]> Authored: Tue May 2 09:20:49 2017 -0700 Committer: Luke Cwik <[email protected]> Committed: Tue May 2 12:57:44 2017 -0700 ---------------------------------------------------------------------- sdks/java/io/hadoop-file-system/README.md | 43 ++ sdks/java/io/hadoop-file-system/pom.xml | 195 ++++++ .../apache/beam/sdk/io/hdfs/HDFSFileSink.java | 478 ++++++++++++++ .../apache/beam/sdk/io/hdfs/HDFSFileSource.java | 625 +++++++++++++++++++ .../beam/sdk/io/hdfs/HadoopFileSystem.java | 240 +++++++ .../sdk/io/hdfs/HadoopFileSystemModule.java | 84 +++ .../sdk/io/hdfs/HadoopFileSystemOptions.java | 49 ++ .../hdfs/HadoopFileSystemOptionsRegistrar.java | 35 ++ .../sdk/io/hdfs/HadoopFileSystemRegistrar.java | 62 ++ .../beam/sdk/io/hdfs/HadoopResourceId.java | 81 +++ .../java/org/apache/beam/sdk/io/hdfs/Sink.java | 195 ++++++ .../org/apache/beam/sdk/io/hdfs/UGIHelper.java | 38 ++ .../java/org/apache/beam/sdk/io/hdfs/Write.java | 585 +++++++++++++++++ .../apache/beam/sdk/io/hdfs/package-info.java | 22 + .../beam/sdk/io/hdfs/HDFSFileSinkTest.java | 172 +++++ .../beam/sdk/io/hdfs/HDFSFileSourceTest.java | 231 +++++++ .../sdk/io/hdfs/HadoopFileSystemModuleTest.java | 65 ++ .../HadoopFileSystemOptionsRegistrarTest.java | 49 ++ .../io/hdfs/HadoopFileSystemOptionsTest.java | 48 ++ .../io/hdfs/HadoopFileSystemRegistrarTest.java | 81 +++ .../beam/sdk/io/hdfs/HadoopFileSystemTest.java | 247 ++++++++ sdks/java/io/hdfs/README.md | 43 -- sdks/java/io/hdfs/pom.xml | 195 ------ .../apache/beam/sdk/io/hdfs/HDFSFileSink.java | 478 -------------- .../apache/beam/sdk/io/hdfs/HDFSFileSource.java | 625 ------------------- .../beam/sdk/io/hdfs/HadoopFileSystem.java | 240 ------- .../sdk/io/hdfs/HadoopFileSystemModule.java | 84 --- .../sdk/io/hdfs/HadoopFileSystemOptions.java | 49 -- .../hdfs/HadoopFileSystemOptionsRegistrar.java | 35 -- .../sdk/io/hdfs/HadoopFileSystemRegistrar.java | 62 -- .../beam/sdk/io/hdfs/HadoopResourceId.java | 81 --- .../java/org/apache/beam/sdk/io/hdfs/Sink.java | 195 ------ .../org/apache/beam/sdk/io/hdfs/UGIHelper.java | 38 -- .../java/org/apache/beam/sdk/io/hdfs/Write.java | 585 ----------------- .../apache/beam/sdk/io/hdfs/package-info.java | 22 - .../beam/sdk/io/hdfs/HDFSFileSinkTest.java | 172 ----- .../beam/sdk/io/hdfs/HDFSFileSourceTest.java | 231 ------- .../sdk/io/hdfs/HadoopFileSystemModuleTest.java | 65 -- .../HadoopFileSystemOptionsRegistrarTest.java | 49 -- .../io/hdfs/HadoopFileSystemOptionsTest.java | 48 -- .../io/hdfs/HadoopFileSystemRegistrarTest.java | 81 --- .../beam/sdk/io/hdfs/HadoopFileSystemTest.java | 247 -------- sdks/java/io/pom.xml | 2 +- 43 files changed, 3626 insertions(+), 3626 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/README.md ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop-file-system/README.md b/sdks/java/io/hadoop-file-system/README.md new file mode 100644 index 0000000..3a734f2 --- /dev/null +++ b/sdks/java/io/hadoop-file-system/README.md @@ -0,0 +1,43 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +# HDFS IO + +This library provides HDFS sources and sinks to make it possible to read and +write Apache Hadoop file formats from Apache Beam pipelines. + +Currently, only the read path is implemented. A `HDFSFileSource` allows any +Hadoop `FileInputFormat` to be read as a `PCollection`. + +A `HDFSFileSource` can be read from using the +`org.apache.beam.sdk.io.Read` transform. For example: + +```java +HDFSFileSource<K, V> source = HDFSFileSource.from(path, MyInputFormat.class, + MyKey.class, MyValue.class); +PCollection<KV<MyKey, MyValue>> records = pipeline.apply(Read.from(mySource)); +``` + +Alternatively, the `readFrom` method is a convenience method that returns a read +transform. For example: + +```java +PCollection<KV<MyKey, MyValue>> records = pipeline.apply(HDFSFileSource.readFrom(path, + MyInputFormat.class, MyKey.class, MyValue.class)); +``` http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop-file-system/pom.xml b/sdks/java/io/hadoop-file-system/pom.xml new file mode 100644 index 0000000..3ec9848 --- /dev/null +++ b/sdks/java/io/hadoop-file-system/pom.xml @@ -0,0 +1,195 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-io-parent</artifactId> + <version>0.7.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <artifactId>beam-sdks-java-io-hadoop-file-system</artifactId> + <name>Apache Beam :: SDKs :: Java :: IO :: Hadoop File System</name> + <description>Library to read and write Hadoop/HDFS file formats from Beam.</description> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <systemPropertyVariables> + <beamUseDummyRunner>false</beamUseDummyRunner> + </systemPropertyVariables> + </configuration> + </plugin> + </plugins> + </build> + + <properties> + <!-- + This is the version of Hadoop used to compile the hadoop-common module. + This dependency is defined with a provided scope. + Users must supply their own Hadoop version at runtime. + --> + <hadoop.version>2.7.3</hadoop.version> + </properties> + + <dependencyManagement> + <!-- + We define dependencies here instead of sdks/java/io because + of a version mimatch between this Hadoop version and other + Hadoop versions declared in other io submodules. + --> + <dependencies> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <classifier>tests</classifier> + <version>${hadoop.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-minicluster</artifactId> + <version>${hadoop.version}</version> + </dependency> + </dependencies> + </dependencyManagement> + + <dependencies> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-core</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-io-hadoop-common</artifactId> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </dependency> + + <dependency> + <groupId>com.google.auto.service</groupId> + <artifactId>auto-service</artifactId> + <optional>true</optional> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + + <dependency> + <groupId>com.google.auto.value</groupId> + <artifactId>auto-value</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + + <dependency> + <groupId>com.google.code.findbugs</groupId> + <artifactId>jsr305</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro-mapred</artifactId> + <version>${avro.version}</version> + <classifier>hadoop2</classifier> + <exclusions> + <!-- exclude old Jetty version of servlet API --> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>servlet-api</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <scope>provided</scope> + </dependency> + + <!-- test dependencies --> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-minicluster</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <classifier>tests</classifier> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-direct-java</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-all</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java new file mode 100644 index 0000000..aee73c4 --- /dev/null +++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java @@ -0,0 +1,478 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hdfs; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import com.google.auto.value.AutoValue; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import java.io.IOException; +import java.net.URI; +import java.security.PrivilegedExceptionAction; +import java.util.Random; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.mapred.AvroKey; +import org.apache.avro.mapreduce.AvroKeyOutputFormat; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.hadoop.SerializableConfiguration; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.values.KV; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.mapreduce.task.JobContextImpl; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; + +/** + * A {@link Sink} for writing records to a Hadoop filesystem using a Hadoop file-based + * output + * format. + * + * <p>To write a {@link org.apache.beam.sdk.values.PCollection} of elements of type T to Hadoop + * filesystem use {@link HDFSFileSink#to}, specify the path (this can be any Hadoop supported + * filesystem: HDFS, S3, GCS etc), the Hadoop {@link FileOutputFormat}, the key class K and the + * value class V and finally the {@link SerializableFunction} to map from T to {@link KV} of K + * and V. + * + * <p>{@code HDFSFileSink} can be used by {@link Write} to create write + * transform. See example below. + * + * <p>{@code HDFSFileSink} comes with helper methods to write text and Apache Avro. For example: + * + * <pre> + * {@code + * HDFSFileSink<CustomSpecificAvroClass, AvroKey<CustomSpecificAvroClass>, NullWritable> sink = + * HDFSFileSink.toAvro(path, AvroCoder.of(CustomSpecificAvroClass.class)); + * avroRecordsPCollection.apply(Write.to(sink)); + * } + * </pre> + * + * @param <T> the type of elements of the input {@link org.apache.beam.sdk.values.PCollection}. + * @param <K> the type of keys to be written to the sink via {@link FileOutputFormat}. + * @param <V> the type of values to be written to the sink via {@link FileOutputFormat}. + */ +@AutoValue +@Experimental +public abstract class HDFSFileSink<T, K, V> extends Sink<T> { + + private static final JobID jobId = new JobID( + Long.toString(System.currentTimeMillis()), + new Random().nextInt(Integer.MAX_VALUE)); + + public abstract String path(); + public abstract Class<? extends FileOutputFormat<K, V>> formatClass(); + public abstract Class<K> keyClass(); + public abstract Class<V> valueClass(); + public abstract SerializableFunction<T, KV<K, V>> outputConverter(); + public abstract SerializableConfiguration serializableConfiguration(); + public @Nullable abstract String username(); + public abstract boolean validate(); + + // ======================================================================= + // Factory methods + // ======================================================================= + + public static <T, K, V, W extends FileOutputFormat<K, V>> HDFSFileSink<T, K, V> + to(String path, + Class<W> formatClass, + Class<K> keyClass, + Class<V> valueClass, + SerializableFunction<T, KV<K, V>> outputConverter) { + return HDFSFileSink.<T, K, V>builder() + .setPath(path) + .setFormatClass(formatClass) + .setKeyClass(keyClass) + .setValueClass(valueClass) + .setOutputConverter(outputConverter) + .setConfiguration(null) + .setUsername(null) + .setValidate(true) + .build(); + } + + public static <T> HDFSFileSink<T, NullWritable, Text> toText(String path) { + SerializableFunction<T, KV<NullWritable, Text>> outputConverter = + new SerializableFunction<T, KV<NullWritable, Text>>() { + @Override + public KV<NullWritable, Text> apply(T input) { + return KV.of(NullWritable.get(), new Text(input.toString())); + } + }; + return to(path, TextOutputFormat.class, NullWritable.class, Text.class, outputConverter); + } + + /** + * Helper to create Avro sink given {@link AvroCoder}. Keep in mind that configuration + * object is altered to enable Avro output. + */ + public static <T> HDFSFileSink<T, AvroKey<T>, NullWritable> toAvro(String path, + final AvroCoder<T> coder, + Configuration conf) { + SerializableFunction<T, KV<AvroKey<T>, NullWritable>> outputConverter = + new SerializableFunction<T, KV<AvroKey<T>, NullWritable>>() { + @Override + public KV<AvroKey<T>, NullWritable> apply(T input) { + return KV.of(new AvroKey<>(input), NullWritable.get()); + } + }; + conf.set("avro.schema.output.key", coder.getSchema().toString()); + return to( + path, + AvroKeyOutputFormat.class, + (Class<AvroKey<T>>) (Class<?>) AvroKey.class, + NullWritable.class, + outputConverter).withConfiguration(conf); + } + + /** + * Helper to create Avro sink given {@link Schema}. Keep in mind that configuration + * object is altered to enable Avro output. + */ + public static HDFSFileSink<GenericRecord, AvroKey<GenericRecord>, NullWritable> + toAvro(String path, Schema schema, Configuration conf) { + return toAvro(path, AvroCoder.of(schema), conf); + } + + /** + * Helper to create Avro sink given {@link Class}. Keep in mind that configuration + * object is altered to enable Avro output. + */ + public static <T> HDFSFileSink<T, AvroKey<T>, NullWritable> toAvro(String path, + Class<T> cls, + Configuration conf) { + return toAvro(path, AvroCoder.of(cls), conf); + } + + // ======================================================================= + // Builder methods + // ======================================================================= + + public abstract Builder<T, K, V> toBuilder(); + public static <T, K, V> Builder builder() { + return new AutoValue_HDFSFileSink.Builder<>(); + } + + /** + * AutoValue builder for {@link HDFSFileSink}. + */ + @AutoValue.Builder + public abstract static class Builder<T, K, V> { + public abstract Builder<T, K, V> setPath(String path); + public abstract Builder<T, K, V> setFormatClass( + Class<? extends FileOutputFormat<K, V>> formatClass); + public abstract Builder<T, K, V> setKeyClass(Class<K> keyClass); + public abstract Builder<T, K, V> setValueClass(Class<V> valueClass); + public abstract Builder<T, K, V> setOutputConverter( + SerializableFunction<T, KV<K, V>> outputConverter); + public abstract Builder<T, K, V> setSerializableConfiguration( + SerializableConfiguration serializableConfiguration); + public Builder<T, K, V> setConfiguration(@Nullable Configuration configuration) { + if (configuration == null) { + configuration = new Configuration(false); + } + return this.setSerializableConfiguration(new SerializableConfiguration(configuration)); + } + public abstract Builder<T, K, V> setUsername(String username); + public abstract Builder<T, K, V> setValidate(boolean validate); + public abstract HDFSFileSink<T, K, V> build(); + } + + public HDFSFileSink<T, K, V> withConfiguration(@Nullable Configuration configuration) { + return this.toBuilder().setConfiguration(configuration).build(); + } + + public HDFSFileSink<T, K, V> withUsername(@Nullable String username) { + return this.toBuilder().setUsername(username).build(); + } + + // ======================================================================= + // Sink + // ======================================================================= + + @Override + public void validate(PipelineOptions options) { + if (validate()) { + try { + UGIHelper.getBestUGI(username()).doAs(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + FileSystem fs = FileSystem.get(new URI(path()), + SerializableConfiguration.newConfiguration(serializableConfiguration())); + checkState(!fs.exists(new Path(path())), "Output path %s already exists", path()); + return null; + } + }); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + @Override + public Sink.WriteOperation<T, String> createWriteOperation() { + return new HDFSWriteOperation<>(this, path(), formatClass()); + } + + private Job newJob() throws IOException { + Job job = SerializableConfiguration.newJob(serializableConfiguration()); + job.setJobID(jobId); + job.setOutputKeyClass(keyClass()); + job.setOutputValueClass(valueClass()); + return job; + } + + // ======================================================================= + // WriteOperation + // ======================================================================= + + /** {{@link WriteOperation}} for HDFS. */ + private static class HDFSWriteOperation<T, K, V> extends WriteOperation<T, String> { + + private final HDFSFileSink<T, K, V> sink; + private final String path; + private final Class<? extends FileOutputFormat<K, V>> formatClass; + + HDFSWriteOperation(HDFSFileSink<T, K, V> sink, + String path, + Class<? extends FileOutputFormat<K, V>> formatClass) { + this.sink = sink; + this.path = path; + this.formatClass = formatClass; + } + + @Override + public void initialize(PipelineOptions options) throws Exception { + Job job = sink.newJob(); + FileOutputFormat.setOutputPath(job, new Path(path)); + } + + @Override + public void setWindowedWrites(boolean windowedWrites) { + } + + @Override + public void finalize(final Iterable<String> writerResults, PipelineOptions options) + throws Exception { + UGIHelper.getBestUGI(sink.username()).doAs(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + doFinalize(writerResults); + return null; + } + }); + } + + private void doFinalize(Iterable<String> writerResults) throws Exception { + Job job = sink.newJob(); + FileSystem fs = FileSystem.get(new URI(path), job.getConfiguration()); + // If there are 0 output shards, just create output folder. + if (!writerResults.iterator().hasNext()) { + fs.mkdirs(new Path(path)); + return; + } + + // job successful + JobContext context = new JobContextImpl(job.getConfiguration(), job.getJobID()); + FileOutputCommitter outputCommitter = new FileOutputCommitter(new Path(path), context); + outputCommitter.commitJob(context); + + // get actual output shards + Set<String> actual = Sets.newHashSet(); + FileStatus[] statuses = fs.listStatus(new Path(path), new PathFilter() { + @Override + public boolean accept(Path path) { + String name = path.getName(); + return !name.startsWith("_") && !name.startsWith("."); + } + }); + + // get expected output shards + Set<String> expected = Sets.newHashSet(writerResults); + checkState( + expected.size() == Lists.newArrayList(writerResults).size(), + "Data loss due to writer results hash collision"); + for (FileStatus s : statuses) { + String name = s.getPath().getName(); + int pos = name.indexOf('.'); + actual.add(pos > 0 ? name.substring(0, pos) : name); + } + + checkState(actual.equals(expected), "Writer results and output files do not match"); + + // rename output shards to Hadoop style, i.e. part-r-00000.txt + int i = 0; + for (FileStatus s : statuses) { + String name = s.getPath().getName(); + int pos = name.indexOf('.'); + String ext = pos > 0 ? name.substring(pos) : ""; + fs.rename( + s.getPath(), + new Path(s.getPath().getParent(), String.format("part-r-%05d%s", i, ext))); + i++; + } + } + + @Override + public Writer<T, String> createWriter(PipelineOptions options) throws Exception { + return new HDFSWriter<>(this, path, formatClass); + } + + @Override + public Sink<T> getSink() { + return sink; + } + + @Override + public Coder<String> getWriterResultCoder() { + return StringUtf8Coder.of(); + } + + } + + // ======================================================================= + // Writer + // ======================================================================= + + private static class HDFSWriter<T, K, V> extends Writer<T, String> { + + private final HDFSWriteOperation<T, K, V> writeOperation; + private final String path; + private final Class<? extends FileOutputFormat<K, V>> formatClass; + + // unique hash for each task + private int hash; + + private TaskAttemptContext context; + private RecordWriter<K, V> recordWriter; + private FileOutputCommitter outputCommitter; + + HDFSWriter(HDFSWriteOperation<T, K, V> writeOperation, + String path, + Class<? extends FileOutputFormat<K, V>> formatClass) { + this.writeOperation = writeOperation; + this.path = path; + this.formatClass = formatClass; + } + + @Override + public void openWindowed(final String uId, + BoundedWindow window, + PaneInfo paneInfo, + int shard, + int numShards) throws Exception { + throw new UnsupportedOperationException("Windowing support not implemented yet for" + + "HDFS. Window " + window); + } + + @Override + public void openUnwindowed(final String uId, int shard, int numShards) throws Exception { + UGIHelper.getBestUGI(writeOperation.sink.username()).doAs( + new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + doOpen(uId); + return null; + } + } + ); + } + + private void doOpen(String uId) throws Exception { + this.hash = uId.hashCode(); + + Job job = writeOperation.sink.newJob(); + FileOutputFormat.setOutputPath(job, new Path(path)); + + // Each Writer is responsible for writing one bundle of elements and is represented by one + // unique Hadoop task based on uId/hash. All tasks share the same job ID. + JobID jobId = job.getJobID(); + TaskID taskId = new TaskID(jobId, TaskType.REDUCE, hash); + context = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID(taskId, 0)); + + FileOutputFormat<K, V> outputFormat = formatClass.newInstance(); + recordWriter = outputFormat.getRecordWriter(context); + outputCommitter = (FileOutputCommitter) outputFormat.getOutputCommitter(context); + } + + @Override + public void write(T value) throws Exception { + checkNotNull(recordWriter, + "Record writer can't be null. Make sure to open Writer first!"); + KV<K, V> kv = writeOperation.sink.outputConverter().apply(value); + recordWriter.write(kv.getKey(), kv.getValue()); + } + + @Override + public void cleanup() throws Exception { + + } + + @Override + public String close() throws Exception { + return UGIHelper.getBestUGI(writeOperation.sink.username()).doAs( + new PrivilegedExceptionAction<String>() { + @Override + public String run() throws Exception { + return doClose(); + } + }); + } + + private String doClose() throws Exception { + // task/attempt successful + recordWriter.close(context); + outputCommitter.commitTask(context); + + // result is prefix of the output file name + return String.format("part-r-%d", hash); + } + + @Override + public WriteOperation<T, String> getWriteOperation() { + return writeOperation; + } + + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java new file mode 100644 index 0000000..5cc2097 --- /dev/null +++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java @@ -0,0 +1,625 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hdfs; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +import com.google.auto.value.AutoValue; +import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.security.PrivilegedExceptionAction; +import java.util.List; +import java.util.ListIterator; +import java.util.NoSuchElementException; +import javax.annotation.Nullable; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.mapred.AvroKey; +import org.apache.avro.mapreduce.AvroKeyInputFormat; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.hadoop.SerializableConfiguration; +import org.apache.beam.sdk.io.hadoop.WritableCoder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.values.KV; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@code BoundedSource} for reading files resident in a Hadoop filesystem using a + * Hadoop file-based input format. + * + * <p>To read a {@link org.apache.beam.sdk.values.PCollection} of + * {@link org.apache.beam.sdk.values.KV} key-value pairs from one or more + * Hadoop files, use {@link HDFSFileSource#from} to specify the path(s) of the files to + * read, the Hadoop {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}, the + * key class and the value class. + * + * <p>A {@code HDFSFileSource} can be read from using the + * {@link org.apache.beam.sdk.io.Read} transform. For example: + * + * <pre> + * {@code + * HDFSFileSource<K, V> source = HDFSFileSource.from(path, MyInputFormat.class, + * MyKey.class, MyValue.class); + * PCollection<KV<MyKey, MyValue>> records = pipeline.apply(Read.from(mySource)); + * } + * </pre> + * + * <p>Implementation note: Since Hadoop's + * {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat} + * determines the input splits, this class extends {@link BoundedSource} rather than + * {@link org.apache.beam.sdk.io.OffsetBasedSource}, since the latter + * dictates input splits. + * @param <T> the type of elements of the result {@link org.apache.beam.sdk.values.PCollection}. + * @param <K> the type of keys to be read from the source via {@link FileInputFormat}. + * @param <V> the type of values to be read from the source via {@link FileInputFormat}. + */ +@AutoValue +@Experimental +public abstract class HDFSFileSource<T, K, V> extends BoundedSource<T> { + private static final long serialVersionUID = 0L; + + private static final Logger LOG = LoggerFactory.getLogger(HDFSFileSource.class); + + public abstract String filepattern(); + public abstract Class<? extends FileInputFormat<K, V>> formatClass(); + public abstract Coder<T> coder(); + public abstract SerializableFunction<KV<K, V>, T> inputConverter(); + public abstract SerializableConfiguration serializableConfiguration(); + public @Nullable abstract SerializableSplit serializableSplit(); + public @Nullable abstract String username(); + public abstract boolean validateSource(); + + // ======================================================================= + // Factory methods + // ======================================================================= + + public static <T, K, V, W extends FileInputFormat<K, V>> HDFSFileSource<T, K, V> + from(String filepattern, + Class<W> formatClass, + Coder<T> coder, + SerializableFunction<KV<K, V>, T> inputConverter) { + return HDFSFileSource.<T, K, V>builder() + .setFilepattern(filepattern) + .setFormatClass(formatClass) + .setCoder(coder) + .setInputConverter(inputConverter) + .setConfiguration(null) + .setUsername(null) + .setValidateSource(true) + .setSerializableSplit(null) + .build(); + } + + public static <K, V, W extends FileInputFormat<K, V>> HDFSFileSource<KV<K, V>, K, V> + from(String filepattern, + Class<W> formatClass, + Class<K> keyClass, + Class<V> valueClass) { + KvCoder<K, V> coder = KvCoder.of(getDefaultCoder(keyClass), getDefaultCoder(valueClass)); + SerializableFunction<KV<K, V>, KV<K, V>> inputConverter = + new SerializableFunction<KV<K, V>, KV<K, V>>() { + @Override + public KV<K, V> apply(KV<K, V> input) { + return input; + } + }; + return HDFSFileSource.<KV<K, V>, K, V>builder() + .setFilepattern(filepattern) + .setFormatClass(formatClass) + .setCoder(coder) + .setInputConverter(inputConverter) + .setConfiguration(null) + .setUsername(null) + .setValidateSource(true) + .setSerializableSplit(null) + .build(); + } + + public static HDFSFileSource<String, LongWritable, Text> + fromText(String filepattern) { + SerializableFunction<KV<LongWritable, Text>, String> inputConverter = + new SerializableFunction<KV<LongWritable, Text>, String>() { + @Override + public String apply(KV<LongWritable, Text> input) { + return input.getValue().toString(); + } + }; + return from(filepattern, TextInputFormat.class, StringUtf8Coder.of(), inputConverter); + } + + /** + * Helper to read from Avro source given {@link AvroCoder}. Keep in mind that configuration + * object is altered to enable Avro input. + */ + public static <T> HDFSFileSource<T, AvroKey<T>, NullWritable> + fromAvro(String filepattern, final AvroCoder<T> coder, Configuration conf) { + Class<AvroKeyInputFormat<T>> formatClass = castClass(AvroKeyInputFormat.class); + SerializableFunction<KV<AvroKey<T>, NullWritable>, T> inputConverter = + new SerializableFunction<KV<AvroKey<T>, NullWritable>, T>() { + @Override + public T apply(KV<AvroKey<T>, NullWritable> input) { + try { + return CoderUtils.clone(coder, input.getKey().datum()); + } catch (CoderException e) { + throw new RuntimeException(e); + } + } + }; + conf.set("avro.schema.input.key", coder.getSchema().toString()); + return from(filepattern, formatClass, coder, inputConverter).withConfiguration(conf); + } + + /** + * Helper to read from Avro source given {@link Schema}. Keep in mind that configuration + * object is altered to enable Avro input. + */ + public static HDFSFileSource<GenericRecord, AvroKey<GenericRecord>, NullWritable> + fromAvro(String filepattern, Schema schema, Configuration conf) { + return fromAvro(filepattern, AvroCoder.of(schema), conf); + } + + /** + * Helper to read from Avro source given {@link Class}. Keep in mind that configuration + * object is altered to enable Avro input. + */ + public static <T> HDFSFileSource<T, AvroKey<T>, NullWritable> + fromAvro(String filepattern, Class<T> cls, Configuration conf) { + return fromAvro(filepattern, AvroCoder.of(cls), conf); + } + + // ======================================================================= + // Builder methods + // ======================================================================= + + public abstract HDFSFileSource.Builder<T, K, V> toBuilder(); + public static <T, K, V> HDFSFileSource.Builder builder() { + return new AutoValue_HDFSFileSource.Builder<>(); + } + + /** + * AutoValue builder for {@link HDFSFileSource}. + */ + @AutoValue.Builder + public abstract static class Builder<T, K, V> { + public abstract Builder<T, K, V> setFilepattern(String filepattern); + public abstract Builder<T, K, V> setFormatClass( + Class<? extends FileInputFormat<K, V>> formatClass); + public abstract Builder<T, K, V> setCoder(Coder<T> coder); + public abstract Builder<T, K, V> setInputConverter( + SerializableFunction<KV<K, V>, T> inputConverter); + public abstract Builder<T, K, V> setSerializableConfiguration( + SerializableConfiguration serializableConfiguration); + public Builder<T, K, V> setConfiguration(Configuration configuration) { + if (configuration == null) { + configuration = new Configuration(false); + } + return this.setSerializableConfiguration(new SerializableConfiguration(configuration)); + } + public abstract Builder<T, K, V> setSerializableSplit(SerializableSplit serializableSplit); + public abstract Builder<T, K, V> setUsername(@Nullable String username); + public abstract Builder<T, K, V> setValidateSource(boolean validate); + public abstract HDFSFileSource<T, K, V> build(); + } + + public HDFSFileSource<T, K, V> withConfiguration(@Nullable Configuration configuration) { + return this.toBuilder().setConfiguration(configuration).build(); + } + + public HDFSFileSource<T, K, V> withUsername(@Nullable String username) { + return this.toBuilder().setUsername(username).build(); + } + + // ======================================================================= + // BoundedSource + // ======================================================================= + + @Override + public List<? extends BoundedSource<T>> split( + final long desiredBundleSizeBytes, + PipelineOptions options) throws Exception { + if (serializableSplit() == null) { + List<InputSplit> inputSplits = UGIHelper.getBestUGI(username()).doAs( + new PrivilegedExceptionAction<List<InputSplit>>() { + @Override + public List<InputSplit> run() throws Exception { + return computeSplits(desiredBundleSizeBytes, serializableConfiguration()); + } + }); + return Lists.transform(inputSplits, + new Function<InputSplit, BoundedSource<T>>() { + @Override + public BoundedSource<T> apply(@Nullable InputSplit inputSplit) { + SerializableSplit serializableSplit = new SerializableSplit(inputSplit); + return HDFSFileSource.this.toBuilder() + .setSerializableSplit(serializableSplit) + .build(); + } + }); + } else { + return ImmutableList.of(this); + } + } + + @Override + public long getEstimatedSizeBytes(PipelineOptions options) { + long size = 0; + + try { + // If this source represents a split from split, + // then return the size of the split, rather then the entire input + if (serializableSplit() != null) { + return serializableSplit().getSplit().getLength(); + } + + size += UGIHelper.getBestUGI(username()).doAs(new PrivilegedExceptionAction<Long>() { + @Override + public Long run() throws Exception { + long size = 0; + Job job = SerializableConfiguration.newJob(serializableConfiguration()); + for (FileStatus st : listStatus(createFormat(job), job)) { + size += st.getLen(); + } + return size; + } + }); + } catch (IOException e) { + LOG.warn( + "Will estimate size of input to be 0 bytes. Can't estimate size of the input due to:", e); + // ignore, and return 0 + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.warn( + "Will estimate size of input to be 0 bytes. Can't estimate size of the input due to:", e); + // ignore, and return 0 + } + return size; + } + + @Override + public BoundedReader<T> createReader(PipelineOptions options) throws IOException { + this.validate(); + return new HDFSFileReader<>(this, filepattern(), formatClass(), serializableSplit()); + } + + @Override + public void validate() { + if (validateSource()) { + try { + UGIHelper.getBestUGI(username()).doAs(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + final Path pathPattern = new Path(filepattern()); + FileSystem fs = FileSystem.get(pathPattern.toUri(), + SerializableConfiguration.newConfiguration(serializableConfiguration())); + FileStatus[] fileStatuses = fs.globStatus(pathPattern); + checkState( + fileStatuses != null && fileStatuses.length > 0, + "Unable to find any files matching %s", filepattern()); + return null; + } + }); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + @Override + public Coder<T> getDefaultOutputCoder() { + return coder(); + } + + // ======================================================================= + // Helpers + // ======================================================================= + + private List<InputSplit> computeSplits(long desiredBundleSizeBytes, + SerializableConfiguration serializableConfiguration) + throws IOException, IllegalAccessException, InstantiationException { + Job job = SerializableConfiguration.newJob(serializableConfiguration); + FileInputFormat.setMinInputSplitSize(job, desiredBundleSizeBytes); + FileInputFormat.setMaxInputSplitSize(job, desiredBundleSizeBytes); + return createFormat(job).getSplits(job); + } + + private FileInputFormat<K, V> createFormat(Job job) + throws IOException, IllegalAccessException, InstantiationException { + Path path = new Path(filepattern()); + FileInputFormat.addInputPath(job, path); + return formatClass().newInstance(); + } + + private List<FileStatus> listStatus(FileInputFormat<K, V> format, Job job) + throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { + // FileInputFormat#listStatus is protected, so call using reflection + Method listStatus = FileInputFormat.class.getDeclaredMethod("listStatus", JobContext.class); + listStatus.setAccessible(true); + @SuppressWarnings("unchecked") + List<FileStatus> stat = (List<FileStatus>) listStatus.invoke(format, job); + return stat; + } + + @SuppressWarnings("unchecked") + private static <T> Coder<T> getDefaultCoder(Class<T> c) { + if (Writable.class.isAssignableFrom(c)) { + Class<? extends Writable> writableClass = (Class<? extends Writable>) c; + return (Coder<T>) WritableCoder.of(writableClass); + } else if (Void.class.equals(c)) { + return (Coder<T>) VoidCoder.of(); + } + // TODO: how to use registered coders here? + throw new IllegalStateException("Cannot find coder for " + c); + } + + @SuppressWarnings("unchecked") + private static <T> Class<T> castClass(Class<?> aClass) { + return (Class<T>) aClass; + } + + // ======================================================================= + // BoundedReader + // ======================================================================= + + private static class HDFSFileReader<T, K, V> extends BoundedSource.BoundedReader<T> { + + private final HDFSFileSource<T, K, V> source; + private final String filepattern; + private final Class<? extends FileInputFormat<K, V>> formatClass; + private final Job job; + + private List<InputSplit> splits; + private ListIterator<InputSplit> splitsIterator; + + private Configuration conf; + private FileInputFormat<?, ?> format; + private TaskAttemptContext attemptContext; + private RecordReader<K, V> currentReader; + private KV<K, V> currentPair; + + HDFSFileReader( + HDFSFileSource<T, K, V> source, + String filepattern, + Class<? extends FileInputFormat<K, V>> formatClass, + SerializableSplit serializableSplit) + throws IOException { + this.source = source; + this.filepattern = filepattern; + this.formatClass = formatClass; + this.job = SerializableConfiguration.newJob(source.serializableConfiguration()); + + if (serializableSplit != null) { + this.splits = ImmutableList.of(serializableSplit.getSplit()); + this.splitsIterator = splits.listIterator(); + } + } + + @Override + public boolean start() throws IOException { + Path path = new Path(filepattern); + FileInputFormat.addInputPath(job, path); + + conf = job.getConfiguration(); + try { + format = formatClass.newInstance(); + } catch (InstantiationException | IllegalAccessException e) { + throw new IOException("Cannot instantiate file input format " + formatClass, e); + } + attemptContext = new TaskAttemptContextImpl(conf, new TaskAttemptID()); + + if (splitsIterator == null) { + splits = format.getSplits(job); + splitsIterator = splits.listIterator(); + } + + return advance(); + } + + @Override + public boolean advance() throws IOException { + try { + if (currentReader != null && currentReader.nextKeyValue()) { + currentPair = nextPair(); + return true; + } else { + while (splitsIterator.hasNext()) { + // advance the reader and see if it has records + final InputSplit nextSplit = splitsIterator.next(); + @SuppressWarnings("unchecked") + RecordReader<K, V> reader = + (RecordReader<K, V>) format.createRecordReader(nextSplit, attemptContext); + if (currentReader != null) { + currentReader.close(); + } + currentReader = reader; + UGIHelper.getBestUGI(source.username()).doAs(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + currentReader.initialize(nextSplit, attemptContext); + return null; + } + }); + if (currentReader.nextKeyValue()) { + currentPair = nextPair(); + return true; + } + currentReader.close(); + currentReader = null; + } + // either no next split or all readers were empty + currentPair = null; + return false; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException(e); + } + } + + @Override + public T getCurrent() throws NoSuchElementException { + if (currentPair == null) { + throw new NoSuchElementException(); + } + return source.inputConverter().apply(currentPair); + } + + @Override + public void close() throws IOException { + if (currentReader != null) { + currentReader.close(); + currentReader = null; + } + currentPair = null; + } + + @Override + public BoundedSource<T> getCurrentSource() { + return source; + } + + @SuppressWarnings("unchecked") + private KV<K, V> nextPair() throws IOException, InterruptedException { + K key = currentReader.getCurrentKey(); + V value = currentReader.getCurrentValue(); + // clone Writable objects since they are reused between calls to RecordReader#nextKeyValue + if (key instanceof Writable) { + key = (K) WritableUtils.clone((Writable) key, conf); + } + if (value instanceof Writable) { + value = (V) WritableUtils.clone((Writable) value, conf); + } + return KV.of(key, value); + } + + // ======================================================================= + // Optional overrides + // ======================================================================= + + @Override + public Double getFractionConsumed() { + if (currentReader == null) { + return 0.0; + } + if (splits.isEmpty()) { + return 1.0; + } + int index = splitsIterator.previousIndex(); + int numReaders = splits.size(); + if (index == numReaders) { + return 1.0; + } + double before = 1.0 * index / numReaders; + double after = 1.0 * (index + 1) / numReaders; + Double fractionOfCurrentReader = getProgress(); + if (fractionOfCurrentReader == null) { + return before; + } + return before + fractionOfCurrentReader * (after - before); + } + + private Double getProgress() { + try { + return (double) currentReader.getProgress(); + } catch (IOException | InterruptedException e) { + return null; + } + } + + } + + // ======================================================================= + // SerializableSplit + // ======================================================================= + + /** + * A wrapper to allow Hadoop {@link org.apache.hadoop.mapreduce.InputSplit}s to be + * serialized using Java's standard serialization mechanisms. Note that the InputSplit + * has to be Writable (which most are). + */ + protected static class SerializableSplit implements Externalizable { + private static final long serialVersionUID = 0L; + + private InputSplit split; + + public SerializableSplit() { + } + + public SerializableSplit(InputSplit split) { + checkArgument(split instanceof Writable, "Split is not writable: %s", split); + this.split = split; + } + + public InputSplit getSplit() { + return split; + } + + @Override + public void writeExternal(ObjectOutput out) throws IOException { + out.writeUTF(split.getClass().getCanonicalName()); + ((Writable) split).write(out); + } + + @Override + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + String className = in.readUTF(); + try { + split = (InputSplit) Class.forName(className).newInstance(); + ((Writable) split).readFields(in); + } catch (InstantiationException | IllegalAccessException e) { + throw new IOException(e); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java new file mode 100644 index 0000000..154a818 --- /dev/null +++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hdfs; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.SeekableByteChannel; +import java.nio.channels.WritableByteChannel; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import org.apache.beam.sdk.io.FileSystem; +import org.apache.beam.sdk.io.fs.CreateOptions; +import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.fs.MatchResult.Metadata; +import org.apache.beam.sdk.io.fs.MatchResult.Status; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; + +/** + * Adapts {@link org.apache.hadoop.fs.FileSystem} connectors to be used as + * Apache Beam {@link FileSystem FileSystems}. + * + * <p>The following HDFS FileSystem(s) are known to be unsupported: + * <ul> + * <li>FTPFileSystem: Missing seek support within FTPInputStream</li> + * </ul> + * + * <p>This implementation assumes that the underlying Hadoop {@link FileSystem} is seek + * efficient when reading. The source code for the following {@link FSInputStream} implementations + * (as of Hadoop 2.7.1) do provide seek implementations: + * <ul> + * <li>HarFsInputStream</li> + * <li>S3InputStream</li> + * <li>DFSInputStream</li> + * <li>SwiftNativeInputStream</li> + * <li>NativeS3FsInputStream</li> + * <li>LocalFSFileInputStream</li> + * <li>NativeAzureFsInputStream</li> + * <li>S3AInputStream</li> + * </ul> + */ +class HadoopFileSystem extends FileSystem<HadoopResourceId> { + @VisibleForTesting + final org.apache.hadoop.fs.FileSystem fileSystem; + + HadoopFileSystem(Configuration configuration) throws IOException { + this.fileSystem = org.apache.hadoop.fs.FileSystem.newInstance(configuration); + } + + @Override + protected List<MatchResult> match(List<String> specs) { + ImmutableList.Builder<MatchResult> resultsBuilder = ImmutableList.builder(); + for (String spec : specs) { + try { + FileStatus[] fileStatuses = fileSystem.globStatus(new Path(spec)); + List<Metadata> metadata = new ArrayList<>(); + for (FileStatus fileStatus : fileStatuses) { + if (fileStatus.isFile()) { + metadata.add(Metadata.builder() + .setResourceId(new HadoopResourceId(fileStatus.getPath().toUri())) + .setIsReadSeekEfficient(true) + .setSizeBytes(fileStatus.getLen()) + .build()); + } + } + resultsBuilder.add(MatchResult.create(Status.OK, metadata)); + } catch (IOException e) { + resultsBuilder.add(MatchResult.create(Status.ERROR, e)); + } + } + return resultsBuilder.build(); + } + + @Override + protected WritableByteChannel create(HadoopResourceId resourceId, CreateOptions createOptions) + throws IOException { + return Channels.newChannel(fileSystem.create(resourceId.toPath())); + } + + @Override + protected ReadableByteChannel open(HadoopResourceId resourceId) throws IOException { + FileStatus fileStatus = fileSystem.getFileStatus(resourceId.toPath()); + return new HadoopSeekableByteChannel(fileStatus, fileSystem.open(resourceId.toPath())); + } + + @Override + protected void copy( + List<HadoopResourceId> srcResourceIds, + List<HadoopResourceId> destResourceIds) throws IOException { + for (int i = 0; i < srcResourceIds.size(); ++i) { + // Unfortunately HDFS FileSystems don't support a native copy operation so we are forced + // to use the inefficient implementation found in FileUtil which copies all the bytes through + // the local machine. + // + // HDFS FileSystem does define a concat method but could only find the DFSFileSystem + // implementing it. The DFSFileSystem implemented concat by deleting the srcs after which + // is not what we want. Also, all the other FileSystem implementations I saw threw + // UnsupportedOperationException within concat. + FileUtil.copy( + fileSystem, srcResourceIds.get(i).toPath(), + fileSystem, destResourceIds.get(i).toPath(), + false, + true, + fileSystem.getConf()); + } + } + + @Override + protected void rename( + List<HadoopResourceId> srcResourceIds, + List<HadoopResourceId> destResourceIds) throws IOException { + for (int i = 0; i < srcResourceIds.size(); ++i) { + fileSystem.rename( + srcResourceIds.get(i).toPath(), + destResourceIds.get(i).toPath()); + } + } + + @Override + protected void delete(Collection<HadoopResourceId> resourceIds) throws IOException { + for (HadoopResourceId resourceId : resourceIds) { + fileSystem.delete(resourceId.toPath(), false); + } + } + + @Override + protected HadoopResourceId matchNewResource(String singleResourceSpec, boolean isDirectory) { + try { + if (singleResourceSpec.endsWith("/") && !isDirectory) { + throw new IllegalArgumentException(String.format( + "Expected file path but received directory path %s", singleResourceSpec)); + } + return !singleResourceSpec.endsWith("/") && isDirectory + ? new HadoopResourceId(new URI(singleResourceSpec + "/")) + : new HadoopResourceId(new URI(singleResourceSpec)); + } catch (URISyntaxException e) { + throw new IllegalArgumentException( + String.format("Invalid spec %s directory %s", singleResourceSpec, isDirectory), + e); + } + } + + @Override + protected String getScheme() { + return fileSystem.getScheme(); + } + + /** An adapter around {@link FSDataInputStream} that implements {@link SeekableByteChannel}. */ + private static class HadoopSeekableByteChannel implements SeekableByteChannel { + private final FileStatus fileStatus; + private final FSDataInputStream inputStream; + private boolean closed; + + private HadoopSeekableByteChannel(FileStatus fileStatus, FSDataInputStream inputStream) { + this.fileStatus = fileStatus; + this.inputStream = inputStream; + this.closed = false; + } + + @Override + public int read(ByteBuffer dst) throws IOException { + if (closed) { + throw new IOException("Channel is closed"); + } + return inputStream.read(dst); + } + + @Override + public int write(ByteBuffer src) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public long position() throws IOException { + if (closed) { + throw new IOException("Channel is closed"); + } + return inputStream.getPos(); + } + + @Override + public SeekableByteChannel position(long newPosition) throws IOException { + if (closed) { + throw new IOException("Channel is closed"); + } + inputStream.seek(newPosition); + return this; + } + + @Override + public long size() throws IOException { + if (closed) { + throw new IOException("Channel is closed"); + } + return fileStatus.getLen(); + } + + @Override + public SeekableByteChannel truncate(long size) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isOpen() { + return !closed; + } + + @Override + public void close() throws IOException { + closed = true; + inputStream.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModule.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModule.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModule.java new file mode 100644 index 0000000..2cb9d8a --- /dev/null +++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModule.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hdfs; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.auto.service.AutoService; +import java.io.IOException; +import java.util.Map; +import java.util.TreeMap; +import org.apache.hadoop.conf.Configuration; + +/** + * A Jackson {@link Module} that registers a {@link JsonSerializer} and {@link JsonDeserializer} + * for a Hadoop {@link Configuration}. The serialized representation is that of a JSON map. + * + * <p>Note that the serialization of the Hadoop {@link Configuration} only keeps the keys and their + * values dropping any configuration hierarchy and source information. + */ +@AutoService(Module.class) +public class HadoopFileSystemModule extends SimpleModule { + public HadoopFileSystemModule() { + super("HadoopFileSystemModule"); + setMixInAnnotation(Configuration.class, ConfigurationMixin.class); + } + + /** A mixin class to add Jackson annotations to the Hadoop {@link Configuration} class. */ + @JsonDeserialize(using = ConfigurationDeserializer.class) + @JsonSerialize(using = ConfigurationSerializer.class) + private static class ConfigurationMixin {} + + /** A Jackson {@link JsonDeserializer} for Hadoop {@link Configuration} objects. */ + static class ConfigurationDeserializer extends JsonDeserializer<Configuration> { + @Override + public Configuration deserialize(JsonParser jsonParser, + DeserializationContext deserializationContext) throws IOException, JsonProcessingException { + Map<String, String> rawConfiguration = + jsonParser.readValueAs(new TypeReference<Map<String, String>>() {}); + Configuration configuration = new Configuration(false); + for (Map.Entry<String, String> entry : rawConfiguration.entrySet()) { + configuration.set(entry.getKey(), entry.getValue()); + } + return configuration; + } + } + + /** A Jackson {@link JsonSerializer} for Hadoop {@link Configuration} objects. */ + static class ConfigurationSerializer extends JsonSerializer<Configuration> { + @Override + public void serialize(Configuration configuration, JsonGenerator jsonGenerator, + SerializerProvider serializerProvider) throws IOException, JsonProcessingException { + Map<String, String> map = new TreeMap<>(); + for (Map.Entry<String, String> entry : configuration) { + map.put(entry.getKey(), entry.getValue()); + } + jsonGenerator.writeObject(map); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java new file mode 100644 index 0000000..31250bc --- /dev/null +++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hdfs; + +import java.util.List; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.DefaultValueFactory; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.hadoop.conf.Configuration; + +/** + * {@link PipelineOptions} which encapsulate {@link Configuration Hadoop Configuration} + * for the {@link HadoopFileSystem}. + */ +public interface HadoopFileSystemOptions extends PipelineOptions { + @Description("A list of Hadoop configurations used to configure zero or more Hadoop filesystems. " + + "To specify on the command-line, represent the value as a JSON list of JSON maps, where " + + "each map represents the entire configuration for a single Hadoop filesystem. For example " + + "--hdfsConfiguration='[{\"fs.default.name\": \"hdfs://localhost:9998\", ...}," + + "{\"fs.default.name\": \"s3a://\", ...},...]'") + @Default.InstanceFactory(ConfigurationLocator.class) + List<Configuration> getHdfsConfiguration(); + void setHdfsConfiguration(List<Configuration> value); + + /** A {@link DefaultValueFactory} which locates a Hadoop {@link Configuration}. */ + class ConfigurationLocator implements DefaultValueFactory<Configuration> { + @Override + public Configuration create(PipelineOptions options) { + // TODO: Find default configuration to use + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrar.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrar.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrar.java new file mode 100644 index 0000000..344623b --- /dev/null +++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrar.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hdfs; + +import com.google.auto.service.AutoService; +import com.google.common.collect.ImmutableList; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsRegistrar; + +/** + * {@link AutoService} registrar for {@link HadoopFileSystemOptions}. + */ +@AutoService(PipelineOptionsRegistrar.class) +public class HadoopFileSystemOptionsRegistrar implements PipelineOptionsRegistrar { + + @Override + public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() { + return ImmutableList.<Class<? extends PipelineOptions>>of(HadoopFileSystemOptions.class); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java new file mode 100644 index 0000000..9159df3 --- /dev/null +++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hdfs; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.service.AutoService; +import com.google.common.collect.ImmutableList; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import javax.annotation.Nonnull; +import org.apache.beam.sdk.io.FileSystem; +import org.apache.beam.sdk.io.FileSystemRegistrar; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.hadoop.conf.Configuration; + +/** + * {@link AutoService} registrar for the {@link HadoopFileSystem}. + */ +@AutoService(FileSystemRegistrar.class) +public class HadoopFileSystemRegistrar implements FileSystemRegistrar { + + @Override + public Iterable<FileSystem> fromOptions(@Nonnull PipelineOptions options) { + List<Configuration> configurations = + options.as(HadoopFileSystemOptions.class).getHdfsConfiguration(); + if (configurations == null) { + configurations = Collections.emptyList(); + } + checkArgument(configurations.size() <= 1, + String.format( + "The %s currently only supports at most a single Hadoop configuration.", + HadoopFileSystemRegistrar.class.getSimpleName())); + + ImmutableList.Builder<FileSystem> builder = ImmutableList.builder(); + for (Configuration configuration : configurations) { + try { + builder.add(new HadoopFileSystem(configuration)); + } catch (IOException e) { + throw new IllegalArgumentException(String.format( + "Failed to construct Hadoop filesystem with configuration %s", configuration), e); + } + } + return builder.build(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java new file mode 100644 index 0000000..e570864 --- /dev/null +++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hdfs; + +import java.net.URI; +import java.util.Objects; +import org.apache.beam.sdk.io.fs.ResolveOptions; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.hadoop.fs.Path; + +/** + * {@link ResourceId} implementation for the {@link HadoopFileSystem}. + */ +class HadoopResourceId implements ResourceId { + private final URI uri; + + HadoopResourceId(URI uri) { + this.uri = uri; + } + + @Override + public ResourceId resolve(String other, ResolveOptions resolveOptions) { + return new HadoopResourceId(uri.resolve(other)); + } + + @Override + public ResourceId getCurrentDirectory() { + return new HadoopResourceId(uri.getPath().endsWith("/") ? uri : uri.resolve(".")); + } + + public boolean isDirectory() { + return uri.getPath().endsWith("/"); + } + + @Override + public String getFilename() { + return new Path(uri).getName(); + } + + @Override + public String getScheme() { + return uri.getScheme(); + } + + @Override + public String toString() { + return uri.toString(); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof HadoopResourceId)) { + return false; + } + return Objects.equals(uri, ((HadoopResourceId) obj).uri); + } + + @Override + public int hashCode() { + return Objects.hashCode(uri); + } + + Path toPath() { + return new Path(uri); + } +}
