Add support for writes with HadoopIO. This allows Hadoop FileOutputFormats to be used with Spark Dataflow, as long as they implement the ShardNameTemplateAware interface. This is easily achieved by subclassing the desired FileOutputFormat class, see TemplatedSequenceFileOutputFormat for an example.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/78388659 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/78388659 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/78388659 Branch: refs/heads/master Commit: 783886592a3f6baf409fb06a75fe4c4994def74a Parents: 0c84c9d Author: Tom White <t...@cloudera.com> Authored: Fri Jun 5 09:14:16 2015 +0200 Committer: Tom White <t...@cloudera.com> Committed: Thu Mar 10 11:15:16 2016 +0000 ---------------------------------------------------------------------- runners/spark/build-resources/checkstyle.xml | 6 +- .../com/cloudera/dataflow/hadoop/HadoopIO.java | 114 +++++++++++++++ .../cloudera/dataflow/hadoop/WritableCoder.java | 112 +++++++++++++++ .../dataflow/spark/ShardNameTemplateAware.java | 28 ++++ .../dataflow/spark/ShardNameTemplateHelper.java | 58 ++++++++ .../spark/TemplatedAvroKeyOutputFormat.java | 40 ++++++ .../TemplatedSequenceFileOutputFormat.java | 40 ++++++ .../spark/TemplatedTextOutputFormat.java | 24 +--- .../dataflow/spark/TransformTranslator.java | 137 ++++++++++++++----- .../dataflow/spark/AvroPipelineTest.java | 4 +- .../spark/HadoopFileFormatPipelineTest.java | 46 ++++--- 11 files changed, 530 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/78388659/runners/spark/build-resources/checkstyle.xml ---------------------------------------------------------------------- diff --git a/runners/spark/build-resources/checkstyle.xml b/runners/spark/build-resources/checkstyle.xml index 716270a..c5b884d 100644 --- a/runners/spark/build-resources/checkstyle.xml +++ b/runners/spark/build-resources/checkstyle.xml @@ -115,7 +115,9 @@ <property name="max" value="100"/> </module> <module name="MethodLength"/> - <module name="ParameterNumber"/> + <module name="ParameterNumber"> + <property name="max" value="8"/> + </module> <module name="OuterTypeNumber"/> <!-- Checks for whitespace --> @@ -180,7 +182,7 @@ <module name="IllegalInstantiation"/> <module name="InnerAssignment"/> <module name="MissingSwitchDefault"/> - <module name="RedundantThrows"/> + <!--<module name="RedundantThrows"/>--> <module name="SimplifyBooleanExpression"/> <module name="SimplifyBooleanReturn"/> <module name="DefaultComesLast"/> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/78388659/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/HadoopIO.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/HadoopIO.java b/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/HadoopIO.java index 533dd30..6389db3 100644 --- a/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/HadoopIO.java +++ b/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/HadoopIO.java @@ -14,13 +14,21 @@ */ package com.cloudera.dataflow.hadoop; +import java.util.HashMap; +import java.util.Map; + +import com.google.cloud.dataflow.sdk.io.ShardNameTemplate; import com.google.cloud.dataflow.sdk.transforms.PTransform; import com.google.cloud.dataflow.sdk.util.WindowingStrategy; import com.google.cloud.dataflow.sdk.values.KV; import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.PDone; import com.google.cloud.dataflow.sdk.values.PInput; import com.google.common.base.Preconditions; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; + +import com.cloudera.dataflow.spark.ShardNameTemplateAware; public final class HadoopIO { @@ -85,4 +93,110 @@ public final class HadoopIO { } } + + public static final class Write { + + private Write() { + } + + public static <K, V> Bound to(String filenamePrefix, + Class<? extends FileOutputFormat<K, V>> format, Class<K> key, Class<V> value) { + return new Bound<>(filenamePrefix, format, key, value); + } + + public static class Bound<K, V> extends PTransform<PCollection<KV<K, V>>, PDone> { + + /** The filename to write to. */ + private final String filenamePrefix; + /** Suffix to use for each filename. */ + private final String filenameSuffix; + /** Requested number of shards. 0 for automatic. */ + private final int numShards; + /** Shard template string. */ + private final String shardTemplate; + private final Class<? extends FileOutputFormat<K, V>> formatClass; + private final Class<K> keyClass; + private final Class<V> valueClass; + private final Map<String, String> configurationProperties; + + Bound(String filenamePrefix, Class<? extends FileOutputFormat<K, V>> format, + Class<K> key, + Class<V> value) { + this(filenamePrefix, "", 0, ShardNameTemplate.INDEX_OF_MAX, format, key, value, + new HashMap<String, String>()); + } + + Bound(String filenamePrefix, String filenameSuffix, int numShards, + String shardTemplate, Class<? extends FileOutputFormat<K, V>> format, + Class<K> key, Class<V> value, Map<String, String> configurationProperties) { + this.filenamePrefix = filenamePrefix; + this.filenameSuffix = filenameSuffix; + this.numShards = numShards; + this.shardTemplate = shardTemplate; + this.formatClass = format; + this.keyClass = key; + this.valueClass = value; + this.configurationProperties = configurationProperties; + } + + public Bound<K, V> withoutSharding() { + return new Bound<>(filenamePrefix, filenameSuffix, 1, "", formatClass, + keyClass, valueClass, configurationProperties); + } + + public Bound<K, V> withConfigurationProperty(String key, String value) { + configurationProperties.put(key, value); + return this; + } + + public String getFilenamePrefix() { + return filenamePrefix; + } + + public String getShardTemplate() { + return shardTemplate; + } + + public int getNumShards() { + return numShards; + } + + public String getFilenameSuffix() { + return filenameSuffix; + } + + public Class<? extends FileOutputFormat<K, V>> getFormatClass() { + return formatClass; + } + + public Class<V> getValueClass() { + return valueClass; + } + + public Class<K> getKeyClass() { + return keyClass; + } + + public Map<String, String> getConfigurationProperties() { + return configurationProperties; + } + + @Override + public PDone apply(PCollection<KV<K, V>> input) { + Preconditions.checkNotNull(filenamePrefix, + "need to set the filename prefix of an HadoopIO.Write transform"); + Preconditions.checkNotNull(formatClass, + "need to set the format class of an HadoopIO.Write transform"); + Preconditions.checkNotNull(keyClass, + "need to set the key class of an HadoopIO.Write transform"); + Preconditions.checkNotNull(valueClass, + "need to set the value class of an HadoopIO.Write transform"); + + Preconditions.checkArgument(ShardNameTemplateAware.class.isAssignableFrom(formatClass), + "Format class must implement " + ShardNameTemplateAware.class.getName()); + + return PDone.in(input.getPipeline()); + } + } + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/78388659/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/WritableCoder.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/WritableCoder.java b/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/WritableCoder.java new file mode 100644 index 0000000..dbc6779 --- /dev/null +++ b/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/WritableCoder.java @@ -0,0 +1,112 @@ +/* + * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ + +package com.cloudera.dataflow.hadoop; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.List; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.coders.CoderException; +import com.google.cloud.dataflow.sdk.coders.StandardCoder; +import com.google.cloud.dataflow.sdk.util.CloudObject; +import org.apache.hadoop.io.Writable; + +/** + * A {@code WritableCoder} is a {@link com.google.cloud.dataflow.sdk.coders.Coder} for a + * Java class that implements {@link org.apache.hadoop.io.Writable}. + * + * <p> To use, specify the coder type on a PCollection: + * <pre> + * {@code + * PCollection<MyRecord> records = + * foo.apply(...).setCoder(WritableCoder.of(MyRecord.class)); + * } + * </pre> + * + * @param <T> the type of elements handled by this coder + */ +public class WritableCoder<T extends Writable> extends StandardCoder<T> { + private static final long serialVersionUID = 0L; + + /** + * Returns a {@code WritableCoder} instance for the provided element class. + * @param <T> the element type + * @param clazz the element class + * @return a {@code WritableCoder} instance for the provided element class + */ + public static <T extends Writable> WritableCoder<T> of(Class<T> clazz) { + return new WritableCoder<>(clazz); + } + + @JsonCreator + @SuppressWarnings("unchecked") + public static WritableCoder<?> of(@JsonProperty("type") String classType) + throws ClassNotFoundException { + Class<?> clazz = Class.forName(classType); + if (!Writable.class.isAssignableFrom(clazz)) { + throw new ClassNotFoundException( + "Class " + classType + " does not implement Writable"); + } + return of((Class<? extends Writable>) clazz); + } + + private final Class<T> type; + + public WritableCoder(Class<T> type) { + this.type = type; + } + + @Override + public void encode(T value, OutputStream outStream, Context context) throws IOException { + value.write(new DataOutputStream(outStream)); + } + + @Override + public T decode(InputStream inStream, Context context) throws IOException { + try { + T t = type.newInstance(); + t.readFields(new DataInputStream(inStream)); + return t; + } catch (InstantiationException | IllegalAccessException e) { + throw new CoderException("unable to deserialize record", e); + } + } + + @Override + public List<Coder<?>> getCoderArguments() { + return null; + } + + @Override + public CloudObject asCloudObject() { + CloudObject result = super.asCloudObject(); + result.put("type", type.getName()); + return result; + } + + @Override + public void verifyDeterministic() throws Coder.NonDeterministicException { + throw new NonDeterministicException(this, + "Hadoop Writable may be non-deterministic."); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/78388659/runners/spark/src/main/java/com/cloudera/dataflow/spark/ShardNameTemplateAware.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/ShardNameTemplateAware.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/ShardNameTemplateAware.java new file mode 100644 index 0000000..bb9a7a5 --- /dev/null +++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/ShardNameTemplateAware.java @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ + +package com.cloudera.dataflow.spark; + +/** + * A marker interface that implementations of + * {@link org.apache.hadoop.mapreduce.lib.output.FileOutputFormat} implement to indicate + * that they produce shard names that adhere to the template in + * {@link com.cloudera.dataflow.hadoop.HadoopIO.Write}. + * + * Some common shard names are defined in + * {@link com.google.cloud.dataflow.sdk.io.ShardNameTemplate}. + */ +public interface ShardNameTemplateAware { +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/78388659/runners/spark/src/main/java/com/cloudera/dataflow/spark/ShardNameTemplateHelper.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/ShardNameTemplateHelper.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/ShardNameTemplateHelper.java new file mode 100644 index 0000000..56980a1 --- /dev/null +++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/ShardNameTemplateHelper.java @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ + +package com.cloudera.dataflow.spark; + +import java.io.IOException; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static com.cloudera.dataflow.spark.ShardNameBuilder.replaceShardNumber; + +public final class ShardNameTemplateHelper { + + private static final Logger LOG = LoggerFactory.getLogger(ShardNameTemplateHelper.class); + + public static final String OUTPUT_FILE_PREFIX = "spark.dataflow.fileoutputformat.prefix"; + public static final String OUTPUT_FILE_TEMPLATE = "spark.dataflow.fileoutputformat.template"; + public static final String OUTPUT_FILE_SUFFIX = "spark.dataflow.fileoutputformat.suffix"; + + private ShardNameTemplateHelper() { + } + + public static <K, V> Path getDefaultWorkFile(FileOutputFormat<K, V> format, + TaskAttemptContext context) throws IOException { + FileOutputCommitter committer = + (FileOutputCommitter) format.getOutputCommitter(context); + return new Path(committer.getWorkPath(), getOutputFile(context)); + } + + private static String getOutputFile(TaskAttemptContext context) { + TaskID taskId = context.getTaskAttemptID().getTaskID(); + int partition = taskId.getId(); + + String filePrefix = context.getConfiguration().get(OUTPUT_FILE_PREFIX); + String fileTemplate = context.getConfiguration().get(OUTPUT_FILE_TEMPLATE); + String fileSuffix = context.getConfiguration().get(OUTPUT_FILE_SUFFIX); + return filePrefix + replaceShardNumber(fileTemplate, partition) + fileSuffix; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/78388659/runners/spark/src/main/java/com/cloudera/dataflow/spark/TemplatedAvroKeyOutputFormat.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TemplatedAvroKeyOutputFormat.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/TemplatedAvroKeyOutputFormat.java new file mode 100644 index 0000000..ef24137 --- /dev/null +++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/TemplatedAvroKeyOutputFormat.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ + +package com.cloudera.dataflow.spark; + +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.avro.mapreduce.AvroKeyOutputFormat; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +public class TemplatedAvroKeyOutputFormat<T> extends AvroKeyOutputFormat<T> + implements ShardNameTemplateAware { + + @Override + public void checkOutputSpecs(JobContext job) { + // don't fail if the output already exists + } + + @Override + protected OutputStream getAvroFileOutputStream(TaskAttemptContext context) throws IOException { + Path path = ShardNameTemplateHelper.getDefaultWorkFile(this, context); + return path.getFileSystem(context.getConfiguration()).create(path); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/78388659/runners/spark/src/main/java/com/cloudera/dataflow/spark/TemplatedSequenceFileOutputFormat.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TemplatedSequenceFileOutputFormat.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/TemplatedSequenceFileOutputFormat.java new file mode 100644 index 0000000..3ab07b5 --- /dev/null +++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/TemplatedSequenceFileOutputFormat.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ + +package com.cloudera.dataflow.spark; + +import java.io.IOException; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; + +public class TemplatedSequenceFileOutputFormat<K, V> extends SequenceFileOutputFormat<K, V> + implements ShardNameTemplateAware { + + @Override + public void checkOutputSpecs(JobContext job) { + // don't fail if the output already exists + } + + @Override + public Path getDefaultWorkFile(TaskAttemptContext context, + String extension) throws IOException { + // note that the passed-in extension is ignored since it comes from the template + return ShardNameTemplateHelper.getDefaultWorkFile(this, context); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/78388659/runners/spark/src/main/java/com/cloudera/dataflow/spark/TemplatedTextOutputFormat.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TemplatedTextOutputFormat.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/TemplatedTextOutputFormat.java index 5d00900..a8e218d 100644 --- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TemplatedTextOutputFormat.java +++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/TemplatedTextOutputFormat.java @@ -20,17 +20,10 @@ import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskID; -import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; -import static com.cloudera.dataflow.spark.ShardNameBuilder.replaceShardNumber; - -public class TemplatedTextOutputFormat<K, V> extends TextOutputFormat<K, V> { - - public static final String OUTPUT_FILE_PREFIX = "spark.dataflow.textoutputformat.prefix"; - public static final String OUTPUT_FILE_TEMPLATE = "spark.dataflow.textoutputformat.template"; - public static final String OUTPUT_FILE_SUFFIX = "spark.dataflow.textoutputformat.suffix"; +public class TemplatedTextOutputFormat<K, V> extends TextOutputFormat<K, V> + implements ShardNameTemplateAware { @Override public void checkOutputSpecs(JobContext job) { @@ -41,18 +34,7 @@ public class TemplatedTextOutputFormat<K, V> extends TextOutputFormat<K, V> { public Path getDefaultWorkFile(TaskAttemptContext context, String extension) throws IOException { // note that the passed-in extension is ignored since it comes from the template - FileOutputCommitter committer = - (FileOutputCommitter) getOutputCommitter(context); - return new Path(committer.getWorkPath(), getOutputFile(context)); + return ShardNameTemplateHelper.getDefaultWorkFile(this, context); } - private String getOutputFile(TaskAttemptContext context) { - TaskID taskId = context.getTaskAttemptID().getTaskID(); - int partition = taskId.getId(); - - String filePrefix = context.getConfiguration().get(OUTPUT_FILE_PREFIX); - String fileTemplate = context.getConfiguration().get(OUTPUT_FILE_TEMPLATE); - String fileSuffix = context.getConfiguration().get(OUTPUT_FILE_SUFFIX); - return filePrefix + replaceShardNumber(fileTemplate, partition) + fileSuffix; - } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/78388659/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java index ee300fd..dfb01f1 100644 --- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java +++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java @@ -48,11 +48,11 @@ import com.google.common.collect.Iterables; import org.apache.avro.mapred.AvroKey; import org.apache.avro.mapreduce.AvroJob; import org.apache.avro.mapreduce.AvroKeyInputFormat; -import org.apache.avro.mapreduce.AvroKeyOutputFormat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDDLike; @@ -409,27 +409,12 @@ public final class TransformTranslator { return new Tuple2<>(t, null); } }); - int shardCount = transform.getNumShards(); - if (shardCount == 0) { - // use default number of shards, but find the actual number for the template - shardCount = last.partitions().size(); - } else { - // number of shards was set explicitly, so repartition - last = last.repartition(transform.getNumShards()); - } - - String template = replaceShardCount(transform.getShardTemplate(), shardCount); - String outputDir = getOutputDirectory(transform.getFilenamePrefix(), template); - String filePrefix = getOutputFilePrefix(transform.getFilenamePrefix(), template); - String fileTemplate = getOutputFileTemplate(transform.getFilenamePrefix(), template); - String fileSuffix = transform.getFilenameSuffix(); - - Configuration conf = new Configuration(); - conf.set(TemplatedTextOutputFormat.OUTPUT_FILE_PREFIX, filePrefix); - conf.set(TemplatedTextOutputFormat.OUTPUT_FILE_TEMPLATE, fileTemplate); - conf.set(TemplatedTextOutputFormat.OUTPUT_FILE_SUFFIX, fileSuffix); - last.saveAsNewAPIHadoopFile(outputDir, Text.class, NullWritable.class, - TemplatedTextOutputFormat.class, conf); + ShardTemplateInformation shardTemplateInfo = + new ShardTemplateInformation(transform.getNumShards(), + transform.getShardTemplate(), transform.getFilenamePrefix(), + transform.getFilenameSuffix()); + writeHadoopFile(last, new Configuration(), shardTemplateInfo, Text.class, + NullWritable.class, TemplatedTextOutputFormat.class); } }; } @@ -462,9 +447,6 @@ public final class TransformTranslator { return new TransformEvaluator<AvroIO.Write.Bound<T>>() { @Override public void evaluate(AvroIO.Write.Bound<T> transform, EvaluationContext context) { - String pattern = transform.getFilenamePrefix(); - @SuppressWarnings("unchecked") - JavaRDDLike<T, ?> last = (JavaRDDLike<T, ?>) context.getInputRDD(transform); Job job; try { job = Job.getInstance(); @@ -472,14 +454,21 @@ public final class TransformTranslator { throw new IllegalStateException(e); } AvroJob.setOutputKeySchema(job, transform.getSchema()); - last.mapToPair(new PairFunction<T, AvroKey<T>, NullWritable>() { - @Override - public Tuple2<AvroKey<T>, NullWritable> call(T t) throws Exception { - return new Tuple2<>(new AvroKey<>(t), NullWritable.get()); - }}) - .saveAsNewAPIHadoopFile(pattern, AvroKey.class, NullWritable.class, - AvroKeyOutputFormat.class, job.getConfiguration()); - + @SuppressWarnings("unchecked") + JavaPairRDD<AvroKey<T>, NullWritable> last = + ((JavaRDDLike<T, ?>) context.getInputRDD(transform)) + .mapToPair(new PairFunction<T, AvroKey<T>, NullWritable>() { + @Override + public Tuple2<AvroKey<T>, NullWritable> call(T t) throws Exception { + return new Tuple2<>(new AvroKey<>(t), NullWritable.get()); + } + }); + ShardTemplateInformation shardTemplateInfo = + new ShardTemplateInformation(transform.getNumShards(), + transform.getShardTemplate(), transform.getFilenamePrefix(), + transform.getFilenameSuffix()); + writeHadoopFile(last, job.getConfiguration(), shardTemplateInfo, + AvroKey.class, NullWritable.class, TemplatedAvroKeyOutputFormat.class); } }; } @@ -506,6 +495,87 @@ public final class TransformTranslator { }; } + private static <K, V> TransformEvaluator<HadoopIO.Write.Bound<K, V>> writeHadoop() { + return new TransformEvaluator<HadoopIO.Write.Bound<K, V>>() { + @Override + public void evaluate(HadoopIO.Write.Bound<K, V> transform, EvaluationContext context) { + @SuppressWarnings("unchecked") + JavaPairRDD<K, V> last = ((JavaRDDLike<KV<K, V>, ?>) context + .getInputRDD(transform)) + .mapToPair(new PairFunction<KV<K, V>, K, V>() { + @Override + public Tuple2<K, V> call(KV<K, V> t) throws Exception { + return new Tuple2<>(t.getKey(), t.getValue()); + } + }); + ShardTemplateInformation shardTemplateInfo = + new ShardTemplateInformation(transform.getNumShards(), + transform.getShardTemplate(), transform.getFilenamePrefix(), + transform.getFilenameSuffix()); + Configuration conf = new Configuration(); + for (Map.Entry<String, String> e : transform.getConfigurationProperties().entrySet()) { + conf.set(e.getKey(), e.getValue()); + } + writeHadoopFile(last, conf, shardTemplateInfo, + transform.getKeyClass(), transform.getValueClass(), transform.getFormatClass()); + } + }; + } + + private static final class ShardTemplateInformation { + private final int numShards; + private final String shardTemplate; + private final String filenamePrefix; + private final String filenameSuffix; + + private ShardTemplateInformation(int numShards, String shardTemplate, String + filenamePrefix, String filenameSuffix) { + this.numShards = numShards; + this.shardTemplate = shardTemplate; + this.filenamePrefix = filenamePrefix; + this.filenameSuffix = filenameSuffix; + } + + public int getNumShards() { + return numShards; + } + + public String getShardTemplate() { + return shardTemplate; + } + + public String getFilenamePrefix() { + return filenamePrefix; + } + + public String getFilenameSuffix() { + return filenameSuffix; + } + } + + private static <K, V> void writeHadoopFile(JavaPairRDD<K, V> rdd, Configuration conf, + ShardTemplateInformation shardTemplateInfo, Class<?> keyClass, Class<?> valueClass, + Class<? extends FileOutputFormat> formatClass) { + int numShards = shardTemplateInfo.getNumShards(); + String shardTemplate = shardTemplateInfo.getShardTemplate(); + String filenamePrefix = shardTemplateInfo.getFilenamePrefix(); + String filenameSuffix = shardTemplateInfo.getFilenameSuffix(); + if (numShards != 0) { + // number of shards was set explicitly, so repartition + rdd = rdd.repartition(numShards); + } + int actualNumShards = rdd.partitions().size(); + String template = replaceShardCount(shardTemplate, actualNumShards); + String outputDir = getOutputDirectory(filenamePrefix, template); + String filePrefix = getOutputFilePrefix(filenamePrefix, template); + String fileTemplate = getOutputFileTemplate(filenamePrefix, template); + + conf.set(ShardNameTemplateHelper.OUTPUT_FILE_PREFIX, filePrefix); + conf.set(ShardNameTemplateHelper.OUTPUT_FILE_TEMPLATE, fileTemplate); + conf.set(ShardNameTemplateHelper.OUTPUT_FILE_SUFFIX, filenameSuffix); + rdd.saveAsNewAPIHadoopFile(outputDir, keyClass, valueClass, formatClass, conf); + } + private static <T> TransformEvaluator<Window.Bound<T>> window() { return new TransformEvaluator<Window.Bound<T>>() { @Override @@ -613,6 +683,7 @@ public final class TransformTranslator { EVALUATORS.put(AvroIO.Read.Bound.class, readAvro()); EVALUATORS.put(AvroIO.Write.Bound.class, writeAvro()); EVALUATORS.put(HadoopIO.Read.Bound.class, readHadoop()); + EVALUATORS.put(HadoopIO.Write.Bound.class, writeHadoop()); EVALUATORS.put(ParDo.Bound.class, parDo()); EVALUATORS.put(ParDo.BoundMulti.class, multiDo()); EVALUATORS.put(GroupByKey.GroupByKeyOnly.class, gbk()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/78388659/runners/spark/src/test/java/com/cloudera/dataflow/spark/AvroPipelineTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/AvroPipelineTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/AvroPipelineTest.java index 95f100c..ea4cc38 100644 --- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/AvroPipelineTest.java +++ b/runners/spark/src/test/java/com/cloudera/dataflow/spark/AvroPipelineTest.java @@ -90,8 +90,8 @@ public class AvroPipelineTest { private List<GenericRecord> readGenericFile() throws IOException { List<GenericRecord> records = Lists.newArrayList(); GenericDatumReader<GenericRecord> genericDatumReader = new GenericDatumReader<>(); - try (DataFileReader<GenericRecord> dataFileReader = new DataFileReader<> - (new File(outputDir, "part-r-00000.avro"), genericDatumReader)) { + try (DataFileReader<GenericRecord> dataFileReader = + new DataFileReader<>(new File(outputDir + "-00000-of-00001"), genericDatumReader)) { for (GenericRecord record : dataFileReader) { records.add(record); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/78388659/runners/spark/src/test/java/com/cloudera/dataflow/spark/HadoopFileFormatPipelineTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/HadoopFileFormatPipelineTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/HadoopFileFormatPipelineTest.java index ba6f7b0..b351018 100644 --- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/HadoopFileFormatPipelineTest.java +++ b/runners/spark/src/test/java/com/cloudera/dataflow/spark/HadoopFileFormatPipelineTest.java @@ -17,26 +17,22 @@ package com.cloudera.dataflow.spark; import com.cloudera.dataflow.hadoop.HadoopIO; import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.io.TextIO; import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.ParDo; import com.google.cloud.dataflow.sdk.values.KV; import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.common.base.Charsets; -import com.google.common.io.Files; import java.io.File; import java.io.IOException; -import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.SequenceFile.Reader; import org.apache.hadoop.io.SequenceFile.Writer; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -60,24 +56,39 @@ public class HadoopFileFormatPipelineTest { } @Test - public void testGeneric() throws Exception { + public void testSequenceFile() throws Exception { populateFile(); Pipeline p = Pipeline.create(PipelineOptionsFactory.create()); @SuppressWarnings("unchecked") Class<? extends FileInputFormat<IntWritable, Text>> inputFormatClass = (Class<? extends FileInputFormat<IntWritable, Text>>) (Class<?>) SequenceFileInputFormat.class; - HadoopIO.Read.Bound<IntWritable,Text> bound = + HadoopIO.Read.Bound<IntWritable,Text> read = HadoopIO.Read.from(inputFile.getAbsolutePath(), inputFormatClass, IntWritable.class, Text.class); - PCollection<KV<IntWritable, Text>> input = p.apply(bound); - input.apply(ParDo.of(new TabSeparatedString())) - .apply(TextIO.Write.to(outputFile.getAbsolutePath()).withoutSharding()); + PCollection<KV<IntWritable, Text>> input = p.apply(read); + @SuppressWarnings("unchecked") + Class<? extends FileOutputFormat<IntWritable, Text>> outputFormatClass = + (Class<? extends FileOutputFormat<IntWritable, Text>>) (Class<?>) TemplatedSequenceFileOutputFormat.class; + @SuppressWarnings("unchecked") + HadoopIO.Write.Bound<IntWritable,Text> write = HadoopIO.Write.to(outputFile.getAbsolutePath(), + outputFormatClass, IntWritable.class, Text.class); + input.apply(write.withoutSharding()); EvaluationResult res = SparkPipelineRunner.create().run(p); res.close(); - List<String> records = Files.readLines(outputFile, Charsets.UTF_8); - for (int i = 0; i < 5; i++) { - assertEquals(i + "\tvalue-" + i, records.get(i)); + IntWritable key = new IntWritable(); + Text value = new Text(); + Reader reader = null; + try { + reader = new Reader(new Configuration(), Reader.file(new Path(outputFile.toURI()))); + int i = 0; + while(reader.next(key, value)) { + assertEquals(i, key.get()); + assertEquals("value-" + i, value.toString()); + i++; + } + } finally { + IOUtils.closeStream(reader); } } @@ -99,11 +110,4 @@ public class HadoopFileFormatPipelineTest { } } - static class TabSeparatedString extends DoFn<KV<IntWritable, Text>, String> { - @Override - public void processElement(ProcessContext c) throws Exception { - c.output(c.element().getKey().toString() + "\t" + c.element().getValue().toString()); - } - } - }