Repository: incubator-beam Updated Branches: refs/heads/master bcefff6a3 -> c8ed39806
[BEAM-77] Move hadoop contrib into hdfs IO Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/404b633d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/404b633d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/404b633d Branch: refs/heads/master Commit: 404b633d43d23940a7f11a93a472980f7bb09ce7 Parents: bcefff6 Author: Jean-Baptiste Onofré <jbono...@apache.org> Authored: Fri Apr 15 08:48:10 2016 +0200 Committer: Davor Bonaci <da...@google.com> Committed: Fri Apr 15 10:28:48 2016 -0700 ---------------------------------------------------------------------- contrib/hadoop/README.md | 24 - contrib/hadoop/pom.xml | 170 ------- .../apache/contrib/hadoop/HadoopFileSource.java | 486 ------------------- .../apache/contrib/hadoop/WritableCoder.java | 111 ----- .../contrib/hadoop/HadoopFileSourceTest.java | 190 -------- .../contrib/hadoop/WritableCoderTest.java | 37 -- sdks/java/io/hdfs/README.md | 24 + sdks/java/io/hdfs/pom.xml | 65 +++ .../apache/beam/sdk/io/hdfs/HDFSFileSource.java | 486 +++++++++++++++++++ .../apache/beam/sdk/io/hdfs/WritableCoder.java | 111 +++++ .../beam/sdk/io/hdfs/HDFSFileSourceTest.java | 190 ++++++++ .../beam/sdk/io/hdfs/WritableCoderTest.java | 37 ++ sdks/java/io/pom.xml | 41 ++ sdks/java/pom.xml | 1 + 14 files changed, 955 insertions(+), 1018 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/404b633d/contrib/hadoop/README.md ---------------------------------------------------------------------- diff --git a/contrib/hadoop/README.md b/contrib/hadoop/README.md deleted file mode 100644 index 49bbf98..0000000 --- a/contrib/hadoop/README.md +++ /dev/null @@ -1,24 +0,0 @@ -# Hadoop module - -This library provides Dataflow sources and sinks to make it possible to read and -write Apache Hadoop file formats from Dataflow pipelines. - -Currently, only the read path is implemented. A `HadoopFileSource` allows any -Hadoop `FileInputFormat` to be read as a `PCollection`. - -A `HadoopFileSource` can be read from using the -`com.google.cloud.dataflow.sdk.io.Read` transform. For example: - -```java -HadoopFileSource<K, V> source = HadoopFileSource.from(path, MyInputFormat.class, - MyKey.class, MyValue.class); -PCollection<KV<MyKey, MyValue>> records = Read.from(mySource); -``` - -Alternatively, the `readFrom` method is a convenience method that returns a read -transform. For example: - -```java -PCollection<KV<MyKey, MyValue>> records = HadoopFileSource.readFrom(path, - MyInputFormat.class, MyKey.class, MyValue.class); -``` http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/404b633d/contrib/hadoop/pom.xml ---------------------------------------------------------------------- diff --git a/contrib/hadoop/pom.xml b/contrib/hadoop/pom.xml deleted file mode 100644 index 24e454b..0000000 --- a/contrib/hadoop/pom.xml +++ /dev/null @@ -1,170 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <groupId>com.google.cloud.dataflow</groupId> - <artifactId>google-cloud-dataflow-java-contrib-hadoop</artifactId> - <name>Google Cloud Dataflow Hadoop Library</name> - <description>Library to read and write Hadoop file formats from Dataflow.</description> - <version>0.0.1-SNAPSHOT</version> - <packaging>jar</packaging> - - <licenses> - <license> - <name>Apache License, Version 2.0</name> - <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url> - <distribution>repo</distribution> - </license> - </licenses> - - <properties> - <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - <beam-version>[0.1.0, 1.0.0)</beam-version> - </properties> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-compiler-plugin</artifactId> - <version>3.2</version> - <configuration> - <source>1.7</source> - <target>1.7</target> - </configuration> - </plugin> - - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-checkstyle-plugin</artifactId> - <version>2.12</version> - <dependencies> - <dependency> - <groupId>com.puppycrawl.tools</groupId> - <artifactId>checkstyle</artifactId> - <version>6.6</version> - </dependency> - </dependencies> - <configuration> - <configLocation>../../sdks/java/checkstyle.xml</configLocation> - <consoleOutput>true</consoleOutput> - <failOnViolation>true</failOnViolation> - <includeTestSourceDirectory>true</includeTestSourceDirectory> - </configuration> - <executions> - <execution> - <goals> - <goal>check</goal> - </goals> - </execution> - </executions> - </plugin> - - <!-- Source plugin for generating source and test-source JARs. --> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-source-plugin</artifactId> - <version>2.4</version> - <executions> - <execution> - <id>attach-sources</id> - <phase>compile</phase> - <goals> - <goal>jar</goal> - </goals> - </execution> - <execution> - <id>attach-test-sources</id> - <phase>test-compile</phase> - <goals> - <goal>test-jar</goal> - </goals> - </execution> - </executions> - </plugin> - - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-javadoc-plugin</artifactId> - <configuration> - <windowtitle>Google Cloud Dataflow Hadoop Contrib</windowtitle> - <doctitle>Google Cloud Dataflow Hadoop Contrib</doctitle> - - <subpackages>com.google.cloud.dataflow.contrib.hadoop</subpackages> - <use>false</use> - <bottom><![CDATA[<br>]]></bottom> - - <offlineLinks> - <offlineLink> - <url>https://cloud.google.com/dataflow/java-sdk/JavaDoc/</url> - <location>${basedir}/../../javadoc/dataflow-sdk-docs</location> - </offlineLink> - <offlineLink> - <url>http://docs.guava-libraries.googlecode.com/git-history/release18/javadoc/</url> - <location>${basedir}/../../javadoc/guava-docs</location> - </offlineLink> - </offlineLinks> - </configuration> - <executions> - <execution> - <goals> - <goal>jar</goal> - </goals> - <phase>package</phase> - </execution> - </executions> - </plugin> - </plugins> - </build> - - <dependencies> - <dependency> - <groupId>org.apache.beam</groupId> - <artifactId>java-sdk-all</artifactId> - <version>${beam-version}</version> - </dependency> - - <!-- @tomwhite: Hadoop doesn't have great RPC client compatibility between one version and - another so it's common to mark the Hadoop dependency as provided and have users specify the - version they need in their project. --> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - <version>2.7.0</version> - <scope>provided</scope> - </dependency> - - <!-- test dependencies --> - <dependency> - <groupId>org.hamcrest</groupId> - <artifactId>hamcrest-all</artifactId> - <version>1.3</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <version>4.11</version> - <scope>test</scope> - </dependency> - </dependencies> -</project> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/404b633d/contrib/hadoop/src/main/java/org/apache/contrib/hadoop/HadoopFileSource.java ---------------------------------------------------------------------- diff --git a/contrib/hadoop/src/main/java/org/apache/contrib/hadoop/HadoopFileSource.java b/contrib/hadoop/src/main/java/org/apache/contrib/hadoop/HadoopFileSource.java deleted file mode 100644 index 65862f7..0000000 --- a/contrib/hadoop/src/main/java/org/apache/contrib/hadoop/HadoopFileSource.java +++ /dev/null @@ -1,486 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.contrib.hadoop; - -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.VoidCoder; -import org.apache.beam.sdk.io.BoundedSource; -import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.values.KV; -import com.google.common.base.Function; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableUtils; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.util.List; -import java.util.ListIterator; -import java.util.NoSuchElementException; -import javax.annotation.Nullable; - -/** - * A {@code BoundedSource} for reading files resident in a Hadoop filesystem using a - * Hadoop file-based input format. - * - * <p>To read a {@link org.apache.beam.sdk.values.PCollection} of - * {@link org.apache.beam.sdk.values.KV} key-value pairs from one or more - * Hadoop files, use {@link HadoopFileSource#from} to specify the path(s) of the files to - * read, the Hadoop {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}, the - * key class and the value class. - * - * <p>A {@code HadoopFileSource} can be read from using the - * {@link org.apache.beam.sdk.io.Read} transform. For example: - * - * <pre> - * {@code - * HadoopFileSource<K, V> source = HadoopFileSource.from(path, MyInputFormat.class, - * MyKey.class, MyValue.class); - * PCollection<KV<MyKey, MyValue>> records = Read.from(mySource); - * } - * </pre> - * - * <p>The {@link HadoopFileSource#readFrom} method is a convenience method - * that returns a read transform. For example: - * - * <pre> - * {@code - * PCollection<KV<MyKey, MyValue>> records = HadoopFileSource.readFrom(path, - * MyInputFormat.class, MyKey.class, MyValue.class); - * } - * </pre> - * - * Implementation note: Since Hadoop's {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat} - * determines the input splits, this class extends {@link BoundedSource} rather than - * {@link org.apache.beam.sdk.io.OffsetBasedSource}, since the latter - * dictates input splits. - - * @param <K> The type of keys to be read from the source. - * @param <V> The type of values to be read from the source. - */ -public class HadoopFileSource<K, V> extends BoundedSource<KV<K, V>> { - private static final long serialVersionUID = 0L; - - private final String filepattern; - private final Class<? extends FileInputFormat<?, ?>> formatClass; - private final Class<K> keyClass; - private final Class<V> valueClass; - private final SerializableSplit serializableSplit; - - /** - * Creates a {@code Read} transform that will read from an {@code HadoopFileSource} - * with the given file name or pattern ("glob") using the given Hadoop - * {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}, - * with key-value types specified by the given key class and value class. - */ - public static <K, V, T extends FileInputFormat<K, V>> Read.Bounded<KV<K, V>> readFrom( - String filepattern, Class<T> formatClass, Class<K> keyClass, Class<V> valueClass) { - return Read.from(from(filepattern, formatClass, keyClass, valueClass)); - } - - /** - * Creates a {@code HadoopFileSource} that reads from the given file name or pattern ("glob") - * using the given Hadoop {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}, - * with key-value types specified by the given key class and value class. - */ - public static <K, V, T extends FileInputFormat<K, V>> HadoopFileSource<K, V> from( - String filepattern, Class<T> formatClass, Class<K> keyClass, Class<V> valueClass) { - @SuppressWarnings("unchecked") - HadoopFileSource<K, V> source = (HadoopFileSource<K, V>) - new HadoopFileSource(filepattern, formatClass, keyClass, valueClass); - return source; - } - - /** - * Create a {@code HadoopFileSource} based on a file or a file pattern specification. - */ - private HadoopFileSource(String filepattern, - Class<? extends FileInputFormat<?, ?>> formatClass, Class<K> keyClass, - Class<V> valueClass) { - this(filepattern, formatClass, keyClass, valueClass, null); - } - - /** - * Create a {@code HadoopFileSource} based on a single Hadoop input split, which won't be - * split up further. - */ - private HadoopFileSource(String filepattern, - Class<? extends FileInputFormat<?, ?>> formatClass, Class<K> keyClass, - Class<V> valueClass, SerializableSplit serializableSplit) { - this.filepattern = filepattern; - this.formatClass = formatClass; - this.keyClass = keyClass; - this.valueClass = valueClass; - this.serializableSplit = serializableSplit; - } - - public String getFilepattern() { - return filepattern; - } - - public Class<? extends FileInputFormat<?, ?>> getFormatClass() { - return formatClass; - } - - public Class<K> getKeyClass() { - return keyClass; - } - - public Class<V> getValueClass() { - return valueClass; - } - - @Override - public void validate() { - Preconditions.checkNotNull(filepattern, - "need to set the filepattern of a HadoopFileSource"); - Preconditions.checkNotNull(formatClass, - "need to set the format class of a HadoopFileSource"); - Preconditions.checkNotNull(keyClass, - "need to set the key class of a HadoopFileSource"); - Preconditions.checkNotNull(valueClass, - "need to set the value class of a HadoopFileSource"); - } - - @Override - public List<? extends BoundedSource<KV<K, V>>> splitIntoBundles(long desiredBundleSizeBytes, - PipelineOptions options) throws Exception { - if (serializableSplit == null) { - return Lists.transform(computeSplits(desiredBundleSizeBytes), - new Function<InputSplit, BoundedSource<KV<K, V>>>() { - @Nullable @Override - public BoundedSource<KV<K, V>> apply(@Nullable InputSplit inputSplit) { - return new HadoopFileSource<K, V>(filepattern, formatClass, keyClass, - valueClass, new SerializableSplit(inputSplit)); - } - }); - } else { - return ImmutableList.of(this); - } - } - - private FileInputFormat<?, ?> createFormat(Job job) throws IOException, IllegalAccessException, - InstantiationException { - Path path = new Path(filepattern); - FileInputFormat.addInputPath(job, path); - return formatClass.newInstance(); - } - - private List<InputSplit> computeSplits(long desiredBundleSizeBytes) throws IOException, - IllegalAccessException, InstantiationException { - Job job = Job.getInstance(); - FileInputFormat.setMinInputSplitSize(job, desiredBundleSizeBytes); - FileInputFormat.setMaxInputSplitSize(job, desiredBundleSizeBytes); - return createFormat(job).getSplits(job); - } - - @Override - public BoundedReader<KV<K, V>> createReader(PipelineOptions options) throws IOException { - this.validate(); - - if (serializableSplit == null) { - return new HadoopFileReader<>(this, filepattern, formatClass); - } else { - return new HadoopFileReader<>(this, filepattern, formatClass, - serializableSplit.getSplit()); - } - } - - @Override - public Coder<KV<K, V>> getDefaultOutputCoder() { - return KvCoder.of(getDefaultCoder(keyClass), getDefaultCoder(valueClass)); - } - - @SuppressWarnings("unchecked") - private <T> Coder<T> getDefaultCoder(Class<T> c) { - if (Writable.class.isAssignableFrom(c)) { - Class<? extends Writable> writableClass = (Class<? extends Writable>) c; - return (Coder<T>) WritableCoder.of(writableClass); - } else if (Void.class.equals(c)) { - return (Coder<T>) VoidCoder.of(); - } - // TODO: how to use registered coders here? - throw new IllegalStateException("Cannot find coder for " + c); - } - - // BoundedSource - - @Override - public long getEstimatedSizeBytes(PipelineOptions options) { - long size = 0; - try { - Job job = Job.getInstance(); // new instance - for (FileStatus st : listStatus(createFormat(job), job)) { - size += st.getLen(); - } - } catch (IOException | NoSuchMethodException | InvocationTargetException - | IllegalAccessException | InstantiationException e) { - // ignore, and return 0 - } - return size; - } - - private <K, V> List<FileStatus> listStatus(FileInputFormat<K, V> format, - JobContext jobContext) throws NoSuchMethodException, InvocationTargetException, - IllegalAccessException { - // FileInputFormat#listStatus is protected, so call using reflection - Method listStatus = FileInputFormat.class.getDeclaredMethod("listStatus", JobContext.class); - listStatus.setAccessible(true); - @SuppressWarnings("unchecked") - List<FileStatus> stat = (List<FileStatus>) listStatus.invoke(format, jobContext); - return stat; - } - - @Override - public boolean producesSortedKeys(PipelineOptions options) throws Exception { - return false; - } - - static class HadoopFileReader<K, V> extends BoundedSource.BoundedReader<KV<K, V>> { - - private final BoundedSource<KV<K, V>> source; - private final String filepattern; - private final Class formatClass; - - private FileInputFormat<?, ?> format; - private TaskAttemptContext attemptContext; - private List<InputSplit> splits; - private ListIterator<InputSplit> splitsIterator; - private Configuration conf; - private RecordReader<K, V> currentReader; - private KV<K, V> currentPair; - - /** - * Create a {@code HadoopFileReader} based on a file or a file pattern specification. - */ - public HadoopFileReader(BoundedSource<KV<K, V>> source, String filepattern, - Class<? extends FileInputFormat<?, ?>> formatClass) { - this(source, filepattern, formatClass, null); - } - - /** - * Create a {@code HadoopFileReader} based on a single Hadoop input split. - */ - public HadoopFileReader(BoundedSource<KV<K, V>> source, String filepattern, - Class<? extends FileInputFormat<?, ?>> formatClass, InputSplit split) { - this.source = source; - this.filepattern = filepattern; - this.formatClass = formatClass; - if (split != null) { - this.splits = ImmutableList.of(split); - this.splitsIterator = splits.listIterator(); - } - } - - @Override - public boolean start() throws IOException { - Job job = Job.getInstance(); // new instance - Path path = new Path(filepattern); - FileInputFormat.addInputPath(job, path); - - try { - @SuppressWarnings("unchecked") - FileInputFormat<K, V> f = (FileInputFormat<K, V>) formatClass.newInstance(); - this.format = f; - } catch (InstantiationException | IllegalAccessException e) { - throw new IOException("Cannot instantiate file input format " + formatClass, e); - } - this.attemptContext = new TaskAttemptContextImpl(job.getConfiguration(), - new TaskAttemptID()); - - if (splitsIterator == null) { - this.splits = format.getSplits(job); - this.splitsIterator = splits.listIterator(); - } - this.conf = job.getConfiguration(); - return advance(); - } - - @Override - public boolean advance() throws IOException { - try { - if (currentReader != null && currentReader.nextKeyValue()) { - currentPair = nextPair(); - return true; - } else { - while (splitsIterator.hasNext()) { - // advance the reader and see if it has records - InputSplit nextSplit = splitsIterator.next(); - @SuppressWarnings("unchecked") - RecordReader<K, V> reader = - (RecordReader<K, V>) format.createRecordReader(nextSplit, attemptContext); - if (currentReader != null) { - currentReader.close(); - } - currentReader = reader; - currentReader.initialize(nextSplit, attemptContext); - if (currentReader.nextKeyValue()) { - currentPair = nextPair(); - return true; - } - currentReader.close(); - currentReader = null; - } - // either no next split or all readers were empty - currentPair = null; - return false; - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IOException(e); - } - } - - @SuppressWarnings("unchecked") - private KV<K, V> nextPair() throws IOException, InterruptedException { - K key = currentReader.getCurrentKey(); - V value = currentReader.getCurrentValue(); - // clone Writable objects since they are reused between calls to RecordReader#nextKeyValue - if (key instanceof Writable) { - key = (K) WritableUtils.clone((Writable) key, conf); - } - if (value instanceof Writable) { - value = (V) WritableUtils.clone((Writable) value, conf); - } - return KV.of(key, value); - } - - @Override - public KV<K, V> getCurrent() throws NoSuchElementException { - if (currentPair == null) { - throw new NoSuchElementException(); - } - return currentPair; - } - - @Override - public void close() throws IOException { - if (currentReader != null) { - currentReader.close(); - currentReader = null; - } - currentPair = null; - } - - @Override - public BoundedSource<KV<K, V>> getCurrentSource() { - return source; - } - - // BoundedReader - - @Override - public Double getFractionConsumed() { - if (currentReader == null) { - return 0.0; - } - if (splits.isEmpty()) { - return 1.0; - } - int index = splitsIterator.previousIndex(); - int numReaders = splits.size(); - if (index == numReaders) { - return 1.0; - } - double before = 1.0 * index / numReaders; - double after = 1.0 * (index + 1) / numReaders; - Double fractionOfCurrentReader = getProgress(); - if (fractionOfCurrentReader == null) { - return before; - } - return before + fractionOfCurrentReader * (after - before); - } - - private Double getProgress() { - try { - return (double) currentReader.getProgress(); - } catch (IOException | InterruptedException e) { - return null; - } - } - - @Override - public BoundedSource<KV<K, V>> splitAtFraction(double fraction) { - // Not yet supported. To implement this, the sizes of the splits should be used to - // calculate the remaining splits that constitute the given fraction, then a - // new source backed by those splits should be returned. - return null; - } - } - - /** - * A wrapper to allow Hadoop {@link org.apache.hadoop.mapreduce.InputSplit}s to be - * serialized using Java's standard serialization mechanisms. Note that the InputSplit - * has to be Writable (which most are). - */ - public static class SerializableSplit implements Externalizable { - private static final long serialVersionUID = 0L; - - private InputSplit split; - - public SerializableSplit() { - } - - public SerializableSplit(InputSplit split) { - Preconditions.checkArgument(split instanceof Writable, "Split is not writable: " - + split); - this.split = split; - } - - public InputSplit getSplit() { - return split; - } - - @Override - public void writeExternal(ObjectOutput out) throws IOException { - out.writeUTF(split.getClass().getCanonicalName()); - ((Writable) split).write(out); - } - - @Override - public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - String className = in.readUTF(); - try { - split = (InputSplit) Class.forName(className).newInstance(); - ((Writable) split).readFields(in); - } catch (InstantiationException | IllegalAccessException e) { - throw new IOException(e); - } - } - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/404b633d/contrib/hadoop/src/main/java/org/apache/contrib/hadoop/WritableCoder.java ---------------------------------------------------------------------- diff --git a/contrib/hadoop/src/main/java/org/apache/contrib/hadoop/WritableCoder.java b/contrib/hadoop/src/main/java/org/apache/contrib/hadoop/WritableCoder.java deleted file mode 100644 index 180875d..0000000 --- a/contrib/hadoop/src/main/java/org/apache/contrib/hadoop/WritableCoder.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.contrib.hadoop; - -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.StandardCoder; -import org.apache.beam.sdk.util.CloudObject; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.hadoop.io.Writable; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.List; - -/** - * A {@code WritableCoder} is a {@link org.apache.beam.sdk.coders.Coder} for a - * Java class that implements {@link org.apache.hadoop.io.Writable}. - * - * <p> To use, specify the coder type on a PCollection: - * <pre> - * {@code - * PCollection<MyRecord> records = - * foo.apply(...).setCoder(WritableCoder.of(MyRecord.class)); - * } - * </pre> - * - * @param <T> the type of elements handled by this coder - */ -public class WritableCoder<T extends Writable> extends StandardCoder<T> { - private static final long serialVersionUID = 0L; - - /** - * Returns a {@code WritableCoder} instance for the provided element class. - * @param <T> the element type - */ - public static <T extends Writable> WritableCoder<T> of(Class<T> clazz) { - return new WritableCoder<>(clazz); - } - - @JsonCreator - @SuppressWarnings("unchecked") - public static WritableCoder<?> of(@JsonProperty("type") String classType) - throws ClassNotFoundException { - Class<?> clazz = Class.forName(classType); - if (!Writable.class.isAssignableFrom(clazz)) { - throw new ClassNotFoundException( - "Class " + classType + " does not implement Writable"); - } - return of((Class<? extends Writable>) clazz); - } - - private final Class<T> type; - - public WritableCoder(Class<T> type) { - this.type = type; - } - - @Override - public void encode(T value, OutputStream outStream, Context context) throws IOException { - value.write(new DataOutputStream(outStream)); - } - - @Override - public T decode(InputStream inStream, Context context) throws IOException { - try { - T t = type.newInstance(); - t.readFields(new DataInputStream(inStream)); - return t; - } catch (InstantiationException | IllegalAccessException e) { - throw new CoderException("unable to deserialize record", e); - } - } - - @Override - public List<Coder<?>> getCoderArguments() { - return null; - } - - @Override - public CloudObject asCloudObject() { - CloudObject result = super.asCloudObject(); - result.put("type", type.getName()); - return result; - } - - @Override - public void verifyDeterministic() throws NonDeterministicException { - throw new NonDeterministicException(this, - "Hadoop Writable may be non-deterministic."); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/404b633d/contrib/hadoop/src/test/java/org/apache/contrib/hadoop/HadoopFileSourceTest.java ---------------------------------------------------------------------- diff --git a/contrib/hadoop/src/test/java/org/apache/contrib/hadoop/HadoopFileSourceTest.java b/contrib/hadoop/src/test/java/org/apache/contrib/hadoop/HadoopFileSourceTest.java deleted file mode 100644 index 72bd72a..0000000 --- a/contrib/hadoop/src/test/java/org/apache/contrib/hadoop/HadoopFileSourceTest.java +++ /dev/null @@ -1,190 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.contrib.hadoop; - -import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import org.apache.beam.sdk.io.BoundedSource; -import org.apache.beam.sdk.io.Source; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.testing.SourceTestUtils; -import org.apache.beam.sdk.values.KV; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.SequenceFile.Writer; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Random; - -/** - * Tests for HadoopFileSource. - */ -public class HadoopFileSourceTest { - - Random random = new Random(0L); - - @Rule - public TemporaryFolder tmpFolder = new TemporaryFolder(); - - @Test - public void testFullyReadSingleFile() throws Exception { - PipelineOptions options = PipelineOptionsFactory.create(); - List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10, 0); - File file = createFileWithData("tmp.seq", expectedResults); - - HadoopFileSource<IntWritable, Text> source = - HadoopFileSource.from(file.toString(), SequenceFileInputFormat.class, - IntWritable.class, Text.class); - - assertEquals(file.length(), source.getEstimatedSizeBytes(null)); - - assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray())); - } - - @Test - public void testFullyReadFilePattern() throws IOException { - PipelineOptions options = PipelineOptionsFactory.create(); - List<KV<IntWritable, Text>> data1 = createRandomRecords(3, 10, 0); - File file1 = createFileWithData("file1", data1); - - List<KV<IntWritable, Text>> data2 = createRandomRecords(3, 10, 10); - createFileWithData("file2", data2); - - List<KV<IntWritable, Text>> data3 = createRandomRecords(3, 10, 20); - createFileWithData("file3", data3); - - List<KV<IntWritable, Text>> data4 = createRandomRecords(3, 10, 30); - createFileWithData("otherfile", data4); - - HadoopFileSource<IntWritable, Text> source = - HadoopFileSource.from(new File(file1.getParent(), "file*").toString(), - SequenceFileInputFormat.class, IntWritable.class, Text.class); - List<KV<IntWritable, Text>> expectedResults = new ArrayList<>(); - expectedResults.addAll(data1); - expectedResults.addAll(data2); - expectedResults.addAll(data3); - assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray())); - } - - @Test - public void testCloseUnstartedFilePatternReader() throws IOException { - PipelineOptions options = PipelineOptionsFactory.create(); - List<KV<IntWritable, Text>> data1 = createRandomRecords(3, 10, 0); - File file1 = createFileWithData("file1", data1); - - List<KV<IntWritable, Text>> data2 = createRandomRecords(3, 10, 10); - createFileWithData("file2", data2); - - List<KV<IntWritable, Text>> data3 = createRandomRecords(3, 10, 20); - createFileWithData("file3", data3); - - List<KV<IntWritable, Text>> data4 = createRandomRecords(3, 10, 30); - createFileWithData("otherfile", data4); - - HadoopFileSource<IntWritable, Text> source = - HadoopFileSource.from(new File(file1.getParent(), "file*").toString(), - SequenceFileInputFormat.class, IntWritable.class, Text.class); - Source.Reader<KV<IntWritable, Text>> reader = source.createReader(options); - // Closing an unstarted FilePatternReader should not throw an exception. - try { - reader.close(); - } catch (Exception e) { - fail("Closing an unstarted FilePatternReader should not throw an exception"); - } - } - - @Test - public void testSplits() throws Exception { - PipelineOptions options = PipelineOptionsFactory.create(); - - List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10000, 0); - File file = createFileWithData("tmp.avro", expectedResults); - - HadoopFileSource<IntWritable, Text> source = - HadoopFileSource.from(file.toString(), SequenceFileInputFormat.class, - IntWritable.class, Text.class); - - // Assert that the source produces the expected records - assertEquals(expectedResults, readFromSource(source, options)); - - // Split with a small bundle size (has to be at least size of sync interval) - List<? extends BoundedSource<KV<IntWritable, Text>>> splits = source - .splitIntoBundles(SequenceFile.SYNC_INTERVAL, options); - assertTrue(splits.size() > 2); - SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options); - int nonEmptySplits = 0; - for (BoundedSource<KV<IntWritable, Text>> subSource : splits) { - if (readFromSource(subSource, options).size() > 0) { - nonEmptySplits += 1; - } - } - assertTrue(nonEmptySplits > 2); - } - - private File createFileWithData(String filename, List<KV<IntWritable, Text>> records) - throws IOException { - File tmpFile = tmpFolder.newFile(filename); - try (Writer writer = SequenceFile.createWriter(new Configuration(), - Writer.keyClass(IntWritable.class), Writer.valueClass(Text.class), - Writer.file(new Path(tmpFile.toURI())))) { - - for (KV<IntWritable, Text> record : records) { - writer.append(record.getKey(), record.getValue()); - } - } - return tmpFile; - } - - private List<KV<IntWritable, Text>> createRandomRecords(int dataItemLength, - int numItems, int offset) { - List<KV<IntWritable, Text>> records = new ArrayList<>(); - for (int i = 0; i < numItems; i++) { - IntWritable key = new IntWritable(i + offset); - Text value = new Text(createRandomString(dataItemLength)); - records.add(KV.of(key, value)); - } - return records; - } - - private String createRandomString(int length) { - char[] chars = "abcdefghijklmnopqrstuvwxyz".toCharArray(); - StringBuilder builder = new StringBuilder(); - for (int i = 0; i < length; i++) { - builder.append(chars[random.nextInt(chars.length)]); - } - return builder.toString(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/404b633d/contrib/hadoop/src/test/java/org/apache/contrib/hadoop/WritableCoderTest.java ---------------------------------------------------------------------- diff --git a/contrib/hadoop/src/test/java/org/apache/contrib/hadoop/WritableCoderTest.java b/contrib/hadoop/src/test/java/org/apache/contrib/hadoop/WritableCoderTest.java deleted file mode 100644 index 368b682..0000000 --- a/contrib/hadoop/src/test/java/org/apache/contrib/hadoop/WritableCoderTest.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.contrib.hadoop; - -import org.apache.beam.sdk.testing.CoderProperties; - -import org.apache.hadoop.io.IntWritable; -import org.junit.Test; - -/** - * Tests for WritableCoder. - */ -public class WritableCoderTest { - - @Test - public void testIntWritableEncoding() throws Exception { - IntWritable value = new IntWritable(42); - WritableCoder<IntWritable> coder = WritableCoder.of(IntWritable.class); - - CoderProperties.coderDecodeEncodeEqual(coder, value); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/404b633d/sdks/java/io/hdfs/README.md ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/README.md b/sdks/java/io/hdfs/README.md new file mode 100644 index 0000000..7149cda --- /dev/null +++ b/sdks/java/io/hdfs/README.md @@ -0,0 +1,24 @@ +# HDFS IO + +This library provides HDFS sources and sinks to make it possible to read and +write Apache Hadoop file formats from Apache Beam pipelines. + +Currently, only the read path is implemented. A `HDFSFileSource` allows any +Hadoop `FileInputFormat` to be read as a `PCollection`. + +A `HDFSFileSource` can be read from using the +`org.apache.beam.sdk.io.Read` transform. For example: + +```java +HDFSFileSource<K, V> source = HDFSFileSource.from(path, MyInputFormat.class, + MyKey.class, MyValue.class); +PCollection<KV<MyKey, MyValue>> records = Read.from(mySource); +``` + +Alternatively, the `readFrom` method is a convenience method that returns a read +transform. For example: + +```java +PCollection<KV<MyKey, MyValue>> records = HDFSFileSource.readFrom(path, + MyInputFormat.class, MyKey.class, MyValue.class); +``` http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/404b633d/sdks/java/io/hdfs/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/pom.xml b/sdks/java/io/hdfs/pom.xml new file mode 100644 index 0000000..3eaef62 --- /dev/null +++ b/sdks/java/io/hdfs/pom.xml @@ -0,0 +1,65 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.beam</groupId> + <artifactId>io-parent</artifactId> + <version>0.1.0-incubating-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <artifactId>hdfs</artifactId> + <name>Apache Beam :: SDKs :: Java :: IO :: HDFS</name> + <description>Library to read and write Hadoop/HDFS file formats from Beam.</description> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>java-sdk-all</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + <version>2.7.0</version> + <scope>provided</scope> + </dependency> + + <!-- test dependencies --> + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-all</artifactId> + <version>1.3</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.11</version> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/404b633d/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java new file mode 100644 index 0000000..ab537eb --- /dev/null +++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java @@ -0,0 +1,486 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hdfs; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.values.KV; +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.List; +import java.util.ListIterator; +import java.util.NoSuchElementException; +import javax.annotation.Nullable; + +/** + * A {@code BoundedSource} for reading files resident in a Hadoop filesystem (HDFS) using a + * Hadoop file-based input format. + * + * <p>To read a {@link org.apache.beam.sdk.values.PCollection} of + * {@link org.apache.beam.sdk.values.KV} key-value pairs from one or more + * Hadoop files, use {@link HDFSFileSource#from} to specify the path(s) of the files to + * read, the Hadoop {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}, the + * key class and the value class. + * + * <p>A {@code HDFSFileSource} can be read from using the + * {@link org.apache.beam.sdk.io.Read} transform. For example: + * + * <pre> + * {@code + * HDFSFileSource<K, V> source = HDFSFileSource.from(path, MyInputFormat.class, + * MyKey.class, MyValue.class); + * PCollection<KV<MyKey, MyValue>> records = Read.from(mySource); + * } + * </pre> + * + * <p>The {@link HDFSFileSource#readFrom} method is a convenience method + * that returns a read transform. For example: + * + * <pre> + * {@code + * PCollection<KV<MyKey, MyValue>> records = HDFSFileSource.readFrom(path, + * MyInputFormat.class, MyKey.class, MyValue.class); + * } + * </pre> + * + * Implementation note: Since Hadoop's {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat} + * determines the input splits, this class extends {@link BoundedSource} rather than + * {@link org.apache.beam.sdk.io.OffsetBasedSource}, since the latter + * dictates input splits. + + * @param <K> The type of keys to be read from the source. + * @param <V> The type of values to be read from the source. + */ +public class HDFSFileSource<K, V> extends BoundedSource<KV<K, V>> { + private static final long serialVersionUID = 0L; + + private final String filepattern; + private final Class<? extends FileInputFormat<?, ?>> formatClass; + private final Class<K> keyClass; + private final Class<V> valueClass; + private final SerializableSplit serializableSplit; + + /** + * Creates a {@code Read} transform that will read from an {@code HDFSFileSource} + * with the given file name or pattern ("glob") using the given Hadoop + * {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}, + * with key-value types specified by the given key class and value class. + */ + public static <K, V, T extends FileInputFormat<K, V>> Read.Bounded<KV<K, V>> readFrom( + String filepattern, Class<T> formatClass, Class<K> keyClass, Class<V> valueClass) { + return Read.from(from(filepattern, formatClass, keyClass, valueClass)); + } + + /** + * Creates a {@code HDFSFileSource} that reads from the given file name or pattern ("glob") + * using the given Hadoop {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}, + * with key-value types specified by the given key class and value class. + */ + public static <K, V, T extends FileInputFormat<K, V>> HDFSFileSource<K, V> from( + String filepattern, Class<T> formatClass, Class<K> keyClass, Class<V> valueClass) { + @SuppressWarnings("unchecked") + HDFSFileSource<K, V> source = (HDFSFileSource<K, V>) + new HDFSFileSource(filepattern, formatClass, keyClass, valueClass); + return source; + } + + /** + * Create a {@code HDFSFileSource} based on a file or a file pattern specification. + */ + private HDFSFileSource(String filepattern, + Class<? extends FileInputFormat<?, ?>> formatClass, Class<K> keyClass, + Class<V> valueClass) { + this(filepattern, formatClass, keyClass, valueClass, null); + } + + /** + * Create a {@code HDFSFileSource} based on a single Hadoop input split, which won't be + * split up further. + */ + private HDFSFileSource(String filepattern, + Class<? extends FileInputFormat<?, ?>> formatClass, Class<K> keyClass, + Class<V> valueClass, SerializableSplit serializableSplit) { + this.filepattern = filepattern; + this.formatClass = formatClass; + this.keyClass = keyClass; + this.valueClass = valueClass; + this.serializableSplit = serializableSplit; + } + + public String getFilepattern() { + return filepattern; + } + + public Class<? extends FileInputFormat<?, ?>> getFormatClass() { + return formatClass; + } + + public Class<K> getKeyClass() { + return keyClass; + } + + public Class<V> getValueClass() { + return valueClass; + } + + @Override + public void validate() { + Preconditions.checkNotNull(filepattern, + "need to set the filepattern of a HDFSFileSource"); + Preconditions.checkNotNull(formatClass, + "need to set the format class of a HDFSFileSource"); + Preconditions.checkNotNull(keyClass, + "need to set the key class of a HDFSFileSource"); + Preconditions.checkNotNull(valueClass, + "need to set the value class of a HDFSFileSource"); + } + + @Override + public List<? extends BoundedSource<KV<K, V>>> splitIntoBundles(long desiredBundleSizeBytes, + PipelineOptions options) throws Exception { + if (serializableSplit == null) { + return Lists.transform(computeSplits(desiredBundleSizeBytes), + new Function<InputSplit, BoundedSource<KV<K, V>>>() { + @Nullable @Override + public BoundedSource<KV<K, V>> apply(@Nullable InputSplit inputSplit) { + return new HDFSFileSource<K, V>(filepattern, formatClass, keyClass, + valueClass, new SerializableSplit(inputSplit)); + } + }); + } else { + return ImmutableList.of(this); + } + } + + private FileInputFormat<?, ?> createFormat(Job job) throws IOException, IllegalAccessException, + InstantiationException { + Path path = new Path(filepattern); + FileInputFormat.addInputPath(job, path); + return formatClass.newInstance(); + } + + private List<InputSplit> computeSplits(long desiredBundleSizeBytes) throws IOException, + IllegalAccessException, InstantiationException { + Job job = Job.getInstance(); + FileInputFormat.setMinInputSplitSize(job, desiredBundleSizeBytes); + FileInputFormat.setMaxInputSplitSize(job, desiredBundleSizeBytes); + return createFormat(job).getSplits(job); + } + + @Override + public BoundedReader<KV<K, V>> createReader(PipelineOptions options) throws IOException { + this.validate(); + + if (serializableSplit == null) { + return new HDFSFileReader<>(this, filepattern, formatClass); + } else { + return new HDFSFileReader<>(this, filepattern, formatClass, + serializableSplit.getSplit()); + } + } + + @Override + public Coder<KV<K, V>> getDefaultOutputCoder() { + return KvCoder.of(getDefaultCoder(keyClass), getDefaultCoder(valueClass)); + } + + @SuppressWarnings("unchecked") + private <T> Coder<T> getDefaultCoder(Class<T> c) { + if (Writable.class.isAssignableFrom(c)) { + Class<? extends Writable> writableClass = (Class<? extends Writable>) c; + return (Coder<T>) WritableCoder.of(writableClass); + } else if (Void.class.equals(c)) { + return (Coder<T>) VoidCoder.of(); + } + // TODO: how to use registered coders here? + throw new IllegalStateException("Cannot find coder for " + c); + } + + // BoundedSource + + @Override + public long getEstimatedSizeBytes(PipelineOptions options) { + long size = 0; + try { + Job job = Job.getInstance(); // new instance + for (FileStatus st : listStatus(createFormat(job), job)) { + size += st.getLen(); + } + } catch (IOException | NoSuchMethodException | InvocationTargetException + | IllegalAccessException | InstantiationException e) { + // ignore, and return 0 + } + return size; + } + + private <K, V> List<FileStatus> listStatus(FileInputFormat<K, V> format, + JobContext jobContext) throws NoSuchMethodException, InvocationTargetException, + IllegalAccessException { + // FileInputFormat#listStatus is protected, so call using reflection + Method listStatus = FileInputFormat.class.getDeclaredMethod("listStatus", JobContext.class); + listStatus.setAccessible(true); + @SuppressWarnings("unchecked") + List<FileStatus> stat = (List<FileStatus>) listStatus.invoke(format, jobContext); + return stat; + } + + @Override + public boolean producesSortedKeys(PipelineOptions options) throws Exception { + return false; + } + + static class HDFSFileReader<K, V> extends BoundedSource.BoundedReader<KV<K, V>> { + + private final BoundedSource<KV<K, V>> source; + private final String filepattern; + private final Class formatClass; + + private FileInputFormat<?, ?> format; + private TaskAttemptContext attemptContext; + private List<InputSplit> splits; + private ListIterator<InputSplit> splitsIterator; + private Configuration conf; + private RecordReader<K, V> currentReader; + private KV<K, V> currentPair; + + /** + * Create a {@code HDFSFileReader} based on a file or a file pattern specification. + */ + public HDFSFileReader(BoundedSource<KV<K, V>> source, String filepattern, + Class<? extends FileInputFormat<?, ?>> formatClass) { + this(source, filepattern, formatClass, null); + } + + /** + * Create a {@code HDFSFileReader} based on a single Hadoop input split. + */ + public HDFSFileReader(BoundedSource<KV<K, V>> source, String filepattern, + Class<? extends FileInputFormat<?, ?>> formatClass, InputSplit split) { + this.source = source; + this.filepattern = filepattern; + this.formatClass = formatClass; + if (split != null) { + this.splits = ImmutableList.of(split); + this.splitsIterator = splits.listIterator(); + } + } + + @Override + public boolean start() throws IOException { + Job job = Job.getInstance(); // new instance + Path path = new Path(filepattern); + FileInputFormat.addInputPath(job, path); + + try { + @SuppressWarnings("unchecked") + FileInputFormat<K, V> f = (FileInputFormat<K, V>) formatClass.newInstance(); + this.format = f; + } catch (InstantiationException | IllegalAccessException e) { + throw new IOException("Cannot instantiate file input format " + formatClass, e); + } + this.attemptContext = new TaskAttemptContextImpl(job.getConfiguration(), + new TaskAttemptID()); + + if (splitsIterator == null) { + this.splits = format.getSplits(job); + this.splitsIterator = splits.listIterator(); + } + this.conf = job.getConfiguration(); + return advance(); + } + + @Override + public boolean advance() throws IOException { + try { + if (currentReader != null && currentReader.nextKeyValue()) { + currentPair = nextPair(); + return true; + } else { + while (splitsIterator.hasNext()) { + // advance the reader and see if it has records + InputSplit nextSplit = splitsIterator.next(); + @SuppressWarnings("unchecked") + RecordReader<K, V> reader = + (RecordReader<K, V>) format.createRecordReader(nextSplit, attemptContext); + if (currentReader != null) { + currentReader.close(); + } + currentReader = reader; + currentReader.initialize(nextSplit, attemptContext); + if (currentReader.nextKeyValue()) { + currentPair = nextPair(); + return true; + } + currentReader.close(); + currentReader = null; + } + // either no next split or all readers were empty + currentPair = null; + return false; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException(e); + } + } + + @SuppressWarnings("unchecked") + private KV<K, V> nextPair() throws IOException, InterruptedException { + K key = currentReader.getCurrentKey(); + V value = currentReader.getCurrentValue(); + // clone Writable objects since they are reused between calls to RecordReader#nextKeyValue + if (key instanceof Writable) { + key = (K) WritableUtils.clone((Writable) key, conf); + } + if (value instanceof Writable) { + value = (V) WritableUtils.clone((Writable) value, conf); + } + return KV.of(key, value); + } + + @Override + public KV<K, V> getCurrent() throws NoSuchElementException { + if (currentPair == null) { + throw new NoSuchElementException(); + } + return currentPair; + } + + @Override + public void close() throws IOException { + if (currentReader != null) { + currentReader.close(); + currentReader = null; + } + currentPair = null; + } + + @Override + public BoundedSource<KV<K, V>> getCurrentSource() { + return source; + } + + // BoundedReader + + @Override + public Double getFractionConsumed() { + if (currentReader == null) { + return 0.0; + } + if (splits.isEmpty()) { + return 1.0; + } + int index = splitsIterator.previousIndex(); + int numReaders = splits.size(); + if (index == numReaders) { + return 1.0; + } + double before = 1.0 * index / numReaders; + double after = 1.0 * (index + 1) / numReaders; + Double fractionOfCurrentReader = getProgress(); + if (fractionOfCurrentReader == null) { + return before; + } + return before + fractionOfCurrentReader * (after - before); + } + + private Double getProgress() { + try { + return (double) currentReader.getProgress(); + } catch (IOException | InterruptedException e) { + return null; + } + } + + @Override + public BoundedSource<KV<K, V>> splitAtFraction(double fraction) { + // Not yet supported. To implement this, the sizes of the splits should be used to + // calculate the remaining splits that constitute the given fraction, then a + // new source backed by those splits should be returned. + return null; + } + } + + /** + * A wrapper to allow Hadoop {@link org.apache.hadoop.mapreduce.InputSplit}s to be + * serialized using Java's standard serialization mechanisms. Note that the InputSplit + * has to be Writable (which most are). + */ + public static class SerializableSplit implements Externalizable { + private static final long serialVersionUID = 0L; + + private InputSplit split; + + public SerializableSplit() { + } + + public SerializableSplit(InputSplit split) { + Preconditions.checkArgument(split instanceof Writable, "Split is not writable: " + + split); + this.split = split; + } + + public InputSplit getSplit() { + return split; + } + + @Override + public void writeExternal(ObjectOutput out) throws IOException { + out.writeUTF(split.getClass().getCanonicalName()); + ((Writable) split).write(out); + } + + @Override + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + String className = in.readUTF(); + try { + split = (InputSplit) Class.forName(className).newInstance(); + ((Writable) split).readFields(in); + } catch (InstantiationException | IllegalAccessException e) { + throw new IOException(e); + } + } + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/404b633d/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java new file mode 100644 index 0000000..814a762 --- /dev/null +++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hdfs; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.StandardCoder; +import org.apache.beam.sdk.util.CloudObject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.hadoop.io.Writable; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.List; + +/** + * A {@code WritableCoder} is a {@link org.apache.beam.sdk.coders.Coder} for a + * Java class that implements {@link org.apache.hadoop.io.Writable}. + * + * <p> To use, specify the coder type on a PCollection: + * <pre> + * {@code + * PCollection<MyRecord> records = + * foo.apply(...).setCoder(WritableCoder.of(MyRecord.class)); + * } + * </pre> + * + * @param <T> the type of elements handled by this coder + */ +public class WritableCoder<T extends Writable> extends StandardCoder<T> { + private static final long serialVersionUID = 0L; + + /** + * Returns a {@code WritableCoder} instance for the provided element class. + * @param <T> the element type + */ + public static <T extends Writable> WritableCoder<T> of(Class<T> clazz) { + return new WritableCoder<>(clazz); + } + + @JsonCreator + @SuppressWarnings("unchecked") + public static WritableCoder<?> of(@JsonProperty("type") String classType) + throws ClassNotFoundException { + Class<?> clazz = Class.forName(classType); + if (!Writable.class.isAssignableFrom(clazz)) { + throw new ClassNotFoundException( + "Class " + classType + " does not implement Writable"); + } + return of((Class<? extends Writable>) clazz); + } + + private final Class<T> type; + + public WritableCoder(Class<T> type) { + this.type = type; + } + + @Override + public void encode(T value, OutputStream outStream, Context context) throws IOException { + value.write(new DataOutputStream(outStream)); + } + + @Override + public T decode(InputStream inStream, Context context) throws IOException { + try { + T t = type.newInstance(); + t.readFields(new DataInputStream(inStream)); + return t; + } catch (InstantiationException | IllegalAccessException e) { + throw new CoderException("unable to deserialize record", e); + } + } + + @Override + public List<Coder<?>> getCoderArguments() { + return null; + } + + @Override + public CloudObject asCloudObject() { + CloudObject result = super.asCloudObject(); + result.put("type", type.getName()); + return result; + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + throw new NonDeterministicException(this, + "Hadoop Writable may be non-deterministic."); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/404b633d/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java new file mode 100644 index 0000000..67df7bc --- /dev/null +++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hdfs; + +import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.Source; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.SourceTestUtils; +import org.apache.beam.sdk.values.KV; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.SequenceFile.Writer; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +/** + * Tests for HDFSFileSource. + */ +public class HDFSFileSourceTest { + + Random random = new Random(0L); + + @Rule + public TemporaryFolder tmpFolder = new TemporaryFolder(); + + @Test + public void testFullyReadSingleFile() throws Exception { + PipelineOptions options = PipelineOptionsFactory.create(); + List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10, 0); + File file = createFileWithData("tmp.seq", expectedResults); + + HDFSFileSource<IntWritable, Text> source = + HDFSFileSource.from(file.toString(), SequenceFileInputFormat.class, + IntWritable.class, Text.class); + + assertEquals(file.length(), source.getEstimatedSizeBytes(null)); + + assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray())); + } + + @Test + public void testFullyReadFilePattern() throws IOException { + PipelineOptions options = PipelineOptionsFactory.create(); + List<KV<IntWritable, Text>> data1 = createRandomRecords(3, 10, 0); + File file1 = createFileWithData("file1", data1); + + List<KV<IntWritable, Text>> data2 = createRandomRecords(3, 10, 10); + createFileWithData("file2", data2); + + List<KV<IntWritable, Text>> data3 = createRandomRecords(3, 10, 20); + createFileWithData("file3", data3); + + List<KV<IntWritable, Text>> data4 = createRandomRecords(3, 10, 30); + createFileWithData("otherfile", data4); + + HDFSFileSource<IntWritable, Text> source = + HDFSFileSource.from(new File(file1.getParent(), "file*").toString(), + SequenceFileInputFormat.class, IntWritable.class, Text.class); + List<KV<IntWritable, Text>> expectedResults = new ArrayList<>(); + expectedResults.addAll(data1); + expectedResults.addAll(data2); + expectedResults.addAll(data3); + assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray())); + } + + @Test + public void testCloseUnstartedFilePatternReader() throws IOException { + PipelineOptions options = PipelineOptionsFactory.create(); + List<KV<IntWritable, Text>> data1 = createRandomRecords(3, 10, 0); + File file1 = createFileWithData("file1", data1); + + List<KV<IntWritable, Text>> data2 = createRandomRecords(3, 10, 10); + createFileWithData("file2", data2); + + List<KV<IntWritable, Text>> data3 = createRandomRecords(3, 10, 20); + createFileWithData("file3", data3); + + List<KV<IntWritable, Text>> data4 = createRandomRecords(3, 10, 30); + createFileWithData("otherfile", data4); + + HDFSFileSource<IntWritable, Text> source = + HDFSFileSource.from(new File(file1.getParent(), "file*").toString(), + SequenceFileInputFormat.class, IntWritable.class, Text.class); + Source.Reader<KV<IntWritable, Text>> reader = source.createReader(options); + // Closing an unstarted FilePatternReader should not throw an exception. + try { + reader.close(); + } catch (Exception e) { + fail("Closing an unstarted FilePatternReader should not throw an exception"); + } + } + + @Test + public void testSplits() throws Exception { + PipelineOptions options = PipelineOptionsFactory.create(); + + List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10000, 0); + File file = createFileWithData("tmp.avro", expectedResults); + + HDFSFileSource<IntWritable, Text> source = + HDFSFileSource.from(file.toString(), SequenceFileInputFormat.class, + IntWritable.class, Text.class); + + // Assert that the source produces the expected records + assertEquals(expectedResults, readFromSource(source, options)); + + // Split with a small bundle size (has to be at least size of sync interval) + List<? extends BoundedSource<KV<IntWritable, Text>>> splits = source + .splitIntoBundles(SequenceFile.SYNC_INTERVAL, options); + assertTrue(splits.size() > 2); + SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options); + int nonEmptySplits = 0; + for (BoundedSource<KV<IntWritable, Text>> subSource : splits) { + if (readFromSource(subSource, options).size() > 0) { + nonEmptySplits += 1; + } + } + assertTrue(nonEmptySplits > 2); + } + + private File createFileWithData(String filename, List<KV<IntWritable, Text>> records) + throws IOException { + File tmpFile = tmpFolder.newFile(filename); + try (Writer writer = SequenceFile.createWriter(new Configuration(), + Writer.keyClass(IntWritable.class), Writer.valueClass(Text.class), + Writer.file(new Path(tmpFile.toURI())))) { + + for (KV<IntWritable, Text> record : records) { + writer.append(record.getKey(), record.getValue()); + } + } + return tmpFile; + } + + private List<KV<IntWritable, Text>> createRandomRecords(int dataItemLength, + int numItems, int offset) { + List<KV<IntWritable, Text>> records = new ArrayList<>(); + for (int i = 0; i < numItems; i++) { + IntWritable key = new IntWritable(i + offset); + Text value = new Text(createRandomString(dataItemLength)); + records.add(KV.of(key, value)); + } + return records; + } + + private String createRandomString(int length) { + char[] chars = "abcdefghijklmnopqrstuvwxyz".toCharArray(); + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < length; i++) { + builder.append(chars[random.nextInt(chars.length)]); + } + return builder.toString(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/404b633d/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/WritableCoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/WritableCoderTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/WritableCoderTest.java new file mode 100644 index 0000000..715da8e --- /dev/null +++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/WritableCoderTest.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hdfs; + +import org.apache.beam.sdk.testing.CoderProperties; + +import org.apache.hadoop.io.IntWritable; +import org.junit.Test; + +/** + * Tests for WritableCoder. + */ +public class WritableCoderTest { + + @Test + public void testIntWritableEncoding() throws Exception { + IntWritable value = new IntWritable(42); + WritableCoder<IntWritable> coder = WritableCoder.of(IntWritable.class); + + CoderProperties.coderDecodeEncodeEqual(coder, value); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/404b633d/sdks/java/io/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml new file mode 100644 index 0000000..75f192c --- /dev/null +++ b/sdks/java/io/pom.xml @@ -0,0 +1,41 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.beam</groupId> + <artifactId>java-sdk-parent</artifactId> + <version>0.1.0-incubating-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <artifactId>io-parent</artifactId> + <packaging>pom</packaging> + <name>Apache Beam :: SDKs :: Java :: IO</name> + <description>Beam SDK Java IO provides different connectivity components + (sources and sinks) to consume and produce data from systems.</description> + + <modules> + <module>hdfs</module> + </modules> + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/404b633d/sdks/java/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/pom.xml b/sdks/java/pom.xml index 1111d92..6bd7ee7 100644 --- a/sdks/java/pom.xml +++ b/sdks/java/pom.xml @@ -36,6 +36,7 @@ <modules> <module>core</module> + <module>io</module> <!-- sdks/java/maven-archtypes has several dependencies on the DataflowPipelineRunner. Until these are refactored out or a released artifact exists, we need to modify the build order.