Remove some HadoopIO.Read.Bound factory methods and fluent setters; always set key/value at creation
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b47a8d0a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b47a8d0a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b47a8d0a Branch: refs/heads/master Commit: b47a8d0a31f8f160a26cd5b41b3317857969f66a Parents: b2f495e Author: Sean Owen <so...@cloudera.com> Authored: Mon Jul 6 12:23:07 2015 +0100 Committer: Tom White <t...@cloudera.com> Committed: Thu Mar 10 11:15:14 2016 +0000 ---------------------------------------------------------------------- .../com/cloudera/dataflow/hadoop/HadoopIO.java | 30 +++++++------------- .../spark/HadoopFileFormatPipelineTest.java | 4 +-- 2 files changed, 12 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b47a8d0a/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/HadoopIO.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/HadoopIO.java b/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/HadoopIO.java index 803c495..587e66e 100644 --- a/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/HadoopIO.java +++ b/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/HadoopIO.java @@ -32,8 +32,9 @@ public final class HadoopIO { private Read() { } - public static <K, V> Bound<K, V> withKeyValueClass(Class<K> key, Class<V> value) { - return new Bound<>(null, null, key, value); + public static <K, V> Bound<K, V> from(String filepattern, Class<? extends FileInputFormat<K, V>> format, + Class<K> key, Class<V> value) { + return new Bound<>(filepattern, format, key, value); } public static class Bound<K, V> extends PTransform<PInput, PCollection<KV<K, V>>> { @@ -45,20 +46,20 @@ public final class HadoopIO { Bound(String filepattern, Class<? extends FileInputFormat<K, V>> format, Class<K> key, Class<V> value) { + Preconditions.checkNotNull(filepattern, + "need to set the filepattern of an HadoopIO.Read transform"); + Preconditions.checkNotNull(format, + "need to set the format class of an HadoopIO.Read transform"); + Preconditions.checkNotNull(key, + "need to set the key class of an HadoopIO.Read transform"); + Preconditions.checkNotNull(value, + "need to set the value class of an HadoopIO.Read transform"); this.filepattern = filepattern; this.formatClass = format; this.keyClass = key; this.valueClass = value; } - public Bound<K, V> from(String file) { - return new Bound<>(file, formatClass, keyClass, valueClass); - } - - public Bound<K, V> withFormatClass(Class<? extends FileInputFormat<K, V>> format) { - return new Bound<>(filepattern, format, keyClass, valueClass); - } - public String getFilepattern() { return filepattern; } @@ -77,15 +78,6 @@ public final class HadoopIO { @Override public PCollection<KV<K, V>> apply(PInput input) { - Preconditions.checkNotNull(filepattern, - "need to set the filepattern of an HadoopIO.Read transform"); - Preconditions.checkNotNull(formatClass, - "need to set the format class of an HadoopIO.Read transform"); - Preconditions.checkNotNull(keyClass, - "need to set the key class of an HadoopIO.Read transform"); - Preconditions.checkNotNull(valueClass, - "need to set the value class of an HadoopIO.Read transform"); - return PCollection.createPrimitiveOutputInternal(input.getPipeline(), WindowingStrategy.globalDefault(), PCollection.IsBounded.BOUNDED); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b47a8d0a/runners/spark/src/test/java/com/cloudera/dataflow/spark/HadoopFileFormatPipelineTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/HadoopFileFormatPipelineTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/HadoopFileFormatPipelineTest.java index 127d58f..ba6f7b0 100644 --- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/HadoopFileFormatPipelineTest.java +++ b/runners/spark/src/test/java/com/cloudera/dataflow/spark/HadoopFileFormatPipelineTest.java @@ -68,9 +68,7 @@ public class HadoopFileFormatPipelineTest { Class<? extends FileInputFormat<IntWritable, Text>> inputFormatClass = (Class<? extends FileInputFormat<IntWritable, Text>>) (Class<?>) SequenceFileInputFormat.class; HadoopIO.Read.Bound<IntWritable,Text> bound = - HadoopIO.Read.withKeyValueClass(IntWritable.class, Text.class). - from(inputFile.getAbsolutePath()) - .withFormatClass(inputFormatClass); + HadoopIO.Read.from(inputFile.getAbsolutePath(), inputFormatClass, IntWritable.class, Text.class); PCollection<KV<IntWritable, Text>> input = p.apply(bound); input.apply(ParDo.of(new TabSeparatedString())) .apply(TextIO.Write.to(outputFile.getAbsolutePath()).withoutSharding());