Updated Branches: refs/heads/master b46c2b8b4 -> 11e9b53e2
CRUNCH-306: One output path per key, Avro edition. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/11e9b53e Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/11e9b53e Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/11e9b53e Branch: refs/heads/master Commit: 11e9b53e27ac92bc255c5484866013aee48d5e93 Parents: b46c2b8 Author: Josh Wills <[email protected]> Authored: Mon Dec 9 21:23:38 2013 -0800 Committer: Josh Wills <[email protected]> Committed: Wed Dec 11 12:08:26 2013 -0800 ---------------------------------------------------------------------- .../apache/crunch/io/avro/AvroPathPerKeyIT.java | 71 ++++++++++++ .../crunch/io/avro/AvroPathPerKeyTarget.java | 114 +++++++++++++++++++ .../apache/crunch/io/impl/FileTargetImpl.java | 4 +- .../crunch/types/avro/AvroOutputFormat.java | 42 ++++--- .../types/avro/AvroPathPerKeyOutputFormat.java | 92 +++++++++++++++ 5 files changed, 305 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/11e9b53e/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroPathPerKeyIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroPathPerKeyIT.java b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroPathPerKeyIT.java new file mode 100644 index 0000000..7b30a60 --- /dev/null +++ b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroPathPerKeyIT.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.avro; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; +import org.apache.crunch.MapFn; +import org.apache.crunch.Pair; +import org.apache.crunch.Pipeline; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.io.From; +import org.apache.crunch.test.CrunchTestSupport; +import org.apache.crunch.types.avro.Avros; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.junit.Test; + +import java.io.Serializable; +import java.util.Set; + +import static org.junit.Assert.assertEquals; + +public class AvroPathPerKeyIT extends CrunchTestSupport implements Serializable { + @Test + public void testOutputFilePerKey() throws Exception { + Pipeline p = new MRPipeline(AvroPathPerKeyIT.class, tempDir.getDefaultConfiguration()); + Path outDir = tempDir.getPath("out"); + p.read(From.textFile(tempDir.copyResourceFileName("docs.txt"))) + .parallelDo(new MapFn<String, Pair<String, String>>() { + @Override + public Pair<String, String> map(String input) { + String[] p = input.split("\t"); + return Pair.of(p[0], p[1]); + } + }, Avros.tableOf(Avros.strings(), Avros.strings())) + .groupByKey() + .write(new AvroPathPerKeyTarget(outDir)); + p.done(); + + Set<String> names = Sets.newHashSet(); + FileSystem fs = outDir.getFileSystem(tempDir.getDefaultConfiguration()); + for (FileStatus fstat : fs.listStatus(outDir)) { + names.add(fstat.getPath().getName()); + } + assertEquals(ImmutableSet.of("A", "B", "_SUCCESS"), names); + + FileStatus[] aStat = fs.listStatus(new Path(outDir, "A")); + assertEquals(1, aStat.length); + assertEquals("part-r-00000.avro", aStat[0].getPath().getName()); + + FileStatus[] bStat = fs.listStatus(new Path(outDir, "B")); + assertEquals(1, bStat.length); + assertEquals("part-r-00000.avro", bStat[0].getPath().getName()); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/11e9b53e/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java new file mode 100644 index 0000000..c6be679 --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.avro; + +import org.apache.avro.mapred.AvroWrapper; +import org.apache.crunch.impl.mr.plan.PlanningParameters; +import org.apache.crunch.io.FormatBundle; +import org.apache.crunch.io.OutputHandler; +import org.apache.crunch.io.SequentialFileNamingScheme; +import org.apache.crunch.io.impl.FileTargetImpl; +import org.apache.crunch.types.PTableType; +import org.apache.crunch.types.PType; +import org.apache.crunch.types.avro.AvroPathPerKeyOutputFormat; +import org.apache.crunch.types.avro.AvroMode; +import org.apache.crunch.types.avro.AvroType; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Job; + +import java.io.IOException; + +/** + * A {@link org.apache.crunch.Target} that wraps {@link org.apache.crunch.types.avro.AvroPathPerKeyOutputFormat} to allow one file + * per key to be written as the output of a {@code PTable<String, T>}. + * + * <p>Note the restrictions that apply to the {@code AvroPathPerKeyOutputFormat}; in particular, it's a good + * idea to write out all of the records for the same key together within each partition of the data. + */ +public class AvroPathPerKeyTarget extends FileTargetImpl { + + public AvroPathPerKeyTarget(String path) { + this(new Path(path)); + } + + public AvroPathPerKeyTarget(Path path) { + super(path, AvroPathPerKeyOutputFormat.class, SequentialFileNamingScheme.getInstance()); + } + + @Override + public boolean accept(OutputHandler handler, PType<?> ptype) { + if (ptype instanceof PTableType && ptype instanceof AvroType) { + if (String.class.equals(((PTableType) ptype).getKeyType().getTypeClass())) { + handler.configure(this, ptype); + return true; + } + } + return false; + } + + @Override + public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) { + AvroType<?> atype = (AvroType) ((PTableType) ptype).getValueType(); + FormatBundle bundle = FormatBundle.forOutput(AvroPathPerKeyOutputFormat.class); + String schemaParam; + if (name == null) { + schemaParam = "avro.output.schema"; + } else { + schemaParam = "avro.output.schema." + name; + } + bundle.set(schemaParam, atype.getSchema().toString()); + AvroMode.fromType(atype).configure(bundle); + configureForMapReduce(job, AvroWrapper.class, NullWritable.class, bundle, outputPath, name); + } + + @Override + public void handleOutputs(Configuration conf, Path workingPath, int index) throws IOException { + FileSystem srcFs = workingPath.getFileSystem(conf); + Path src = new Path(workingPath, PlanningParameters.MULTI_OUTPUT_PREFIX + index); + FileSystem dstFs = path.getFileSystem(conf); + boolean sameFs = isCompatible(srcFs, path); + if (!dstFs.exists(path)) { + if (sameFs) { + srcFs.rename(src, path); + } else { + dstFs.mkdirs(path); + FileUtil.copy(srcFs, src, dstFs, path, true, true, conf); + } + } else { + Path[] srcs = FileUtil.stat2Paths(srcFs.listStatus(src)); + for (Path s : srcs) { + Path d = new Path(path, s.getName()); + if (sameFs) { + srcFs.rename(s, d); + } else { + FileUtil.copy(srcFs, s, dstFs, d, true, true, conf); + } + } + } + dstFs.create(getSuccessIndicator(), true).close(); + } + + @Override + public String toString() { + return "AvroFilePerKey(" + path + ")"; + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/11e9b53e/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java index 8ae2589..7472e3d 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java @@ -78,7 +78,7 @@ public class FileTargetImpl implements PathTarget { @Override public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) { - Converter converter = ptype.getConverter(); + Converter converter = getConverter(ptype); Class keyClass = converter.getKeyClass(); Class valueClass = converter.getValueClass(); configureForMapReduce(job, keyClass, valueClass, formatBundle, outputPath, name); @@ -139,7 +139,7 @@ public class FileTargetImpl implements PathTarget { dstFs.create(getSuccessIndicator(), true).close(); } - private Path getSuccessIndicator() { + protected Path getSuccessIndicator() { return new Path(path, "_SUCCESS"); } http://git-wip-us.apache.org/repos/asf/crunch/blob/11e9b53e/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java index 526dabb..5a4499d 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java @@ -25,6 +25,7 @@ import org.apache.avro.file.DataFileWriter; import org.apache.avro.mapred.AvroJob; import org.apache.avro.mapred.AvroWrapper; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.JobConf; @@ -35,51 +36,60 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** An {@link org.apache.hadoop.mapreduce.OutputFormat} for Avro data files. */ public class AvroOutputFormat<T> extends FileOutputFormat<AvroWrapper<T>, NullWritable> { - @Override - public RecordWriter<AvroWrapper<T>, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, - InterruptedException { - - Configuration conf = context.getConfiguration(); + public static <S> DataFileWriter<S> getDataFileWriter(Path path, Configuration conf) throws IOException { Schema schema = null; String outputName = conf.get("crunch.namedoutput"); if (outputName != null && !outputName.isEmpty()) { schema = (new Schema.Parser()).parse(conf.get("avro.output.schema." + outputName)); } else { - schema = AvroJob.getOutputSchema(context.getConfiguration()); + schema = AvroJob.getOutputSchema(conf); } - final DataFileWriter<T> WRITER = new DataFileWriter<T>( - AvroMode.fromConfiguration(conf).<T>getWriter(schema)); + DataFileWriter<S> writer = new DataFileWriter<S>(AvroMode.fromConfiguration(conf).<S>getWriter(schema)); JobConf jc = new JobConf(conf); /* copied from org.apache.avro.mapred.AvroOutputFormat */ - + if (org.apache.hadoop.mapred.FileOutputFormat.getCompressOutput(jc)) { int level = conf.getInt(org.apache.avro.mapred.AvroOutputFormat.DEFLATE_LEVEL_KEY, org.apache.avro.mapred.AvroOutputFormat.DEFAULT_DEFLATE_LEVEL); - String codecName = conf.get(AvroJob.OUTPUT_CODEC, + String codecName = conf.get(AvroJob.OUTPUT_CODEC, org.apache.avro.file.DataFileConstants.DEFLATE_CODEC); CodecFactory codec = codecName.equals(org.apache.avro.file.DataFileConstants.DEFLATE_CODEC) ? CodecFactory.deflateCodec(level) : CodecFactory.fromString(codecName); - WRITER.setCodec(codec); + writer.setCodec(codec); } - WRITER.setSyncInterval(jc.getInt(org.apache.avro.mapred.AvroOutputFormat.SYNC_INTERVAL_KEY, + writer.setSyncInterval(jc.getInt(org.apache.avro.mapred.AvroOutputFormat.SYNC_INTERVAL_KEY, org.apache.avro.file.DataFileConstants.DEFAULT_SYNC_INTERVAL)); + FileSystem fs = path.getFileSystem(conf); + if (fs.exists(path)) { + writer.create(schema, fs.append(path)); + } else { + writer.create(schema, fs.create(path)); + } + return writer; + } + + @Override + public RecordWriter<AvroWrapper<T>, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, + InterruptedException { + + Configuration conf = context.getConfiguration(); Path path = getDefaultWorkFile(context, org.apache.avro.mapred.AvroOutputFormat.EXT); - WRITER.create(schema, path.getFileSystem(context.getConfiguration()).create(path)); - + final DataFileWriter<T> writer = getDataFileWriter(path, conf); + return new RecordWriter<AvroWrapper<T>, NullWritable>() { @Override public void write(AvroWrapper<T> wrapper, NullWritable ignore) throws IOException { - WRITER.append(wrapper.datum()); + writer.append(wrapper.datum()); } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { - WRITER.close(); + writer.close(); } }; } http://git-wip-us.apache.org/repos/asf/crunch/blob/11e9b53e/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroPathPerKeyOutputFormat.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroPathPerKeyOutputFormat.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroPathPerKeyOutputFormat.java new file mode 100644 index 0000000..da84fc0 --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroPathPerKeyOutputFormat.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.types.avro; + +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.mapred.AvroWrapper; +import org.apache.avro.mapred.Pair; +import org.apache.avro.util.Utf8; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; + +import java.io.IOException; + +/** + * A {@link FileOutputFormat} that takes in a {@link Utf8} and an Avro record and writes the Avro records to + * a sub-directory of the output path whose name is equal to the string-form of the {@code Utf8}. + * + * This {@code OutputFormat} only keeps one {@code RecordWriter} open at a time, so it's a very good idea to write + * out all of the records for the same key at the same time within each partition so as not to be frequently opening + * and closing files. + */ +public class AvroPathPerKeyOutputFormat<T> extends FileOutputFormat<AvroWrapper<Pair<Utf8, T>>, NullWritable> { + @Override + public RecordWriter<AvroWrapper<Pair<Utf8, T>>, NullWritable> getRecordWriter(TaskAttemptContext taskAttemptContext) + throws IOException, InterruptedException { + Configuration conf = taskAttemptContext.getConfiguration(); + Path basePath = new Path(getOutputPath(taskAttemptContext), conf.get("mapreduce.output.basename", "part")); + return new AvroFilePerKeyRecordWriter<T>(basePath, getUniqueFile(taskAttemptContext, "part", ".avro"), conf); + } + + private class AvroFilePerKeyRecordWriter<T> extends RecordWriter<AvroWrapper<Pair<Utf8, T>>, NullWritable> { + + private final Path basePath; + private final String uniqueFileName; + private final Configuration conf; + private String currentKey; + private DataFileWriter<T> currentWriter; + + public AvroFilePerKeyRecordWriter(Path basePath, String uniqueFileName, Configuration conf) { + this.basePath = basePath; + this.uniqueFileName = uniqueFileName; + this.conf = conf; + } + + @Override + public void write(AvroWrapper<Pair<Utf8, T>> record, NullWritable n) throws IOException, InterruptedException { + String key = record.datum().key().toString(); + if (!key.equals(currentKey)) { + if (currentWriter != null) { + currentWriter.close(); + } + currentKey = key; + Path dir = new Path(basePath, key); + FileSystem fs = dir.getFileSystem(conf); + if (!fs.exists(dir)) { + fs.mkdirs(dir); + } + currentWriter = AvroOutputFormat.getDataFileWriter(new Path(dir, uniqueFileName), conf); + } + currentWriter.append(record.datum().value()); + } + + @Override + public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { + if (currentWriter != null) { + currentWriter.close(); + currentKey = null; + currentWriter = null; + } + } + } +}
