Updated Branches: refs/heads/master 48cf308c8 -> 5578d5e8b
CRUNCH-199: Add support for Trevni sources and targets. Contributed by Micah Whitacre. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/5578d5e8 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/5578d5e8 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/5578d5e8 Branch: refs/heads/master Commit: 5578d5e8be97ff5d5b86815e005d9df387965f60 Parents: 48cf308 Author: Josh Wills <[email protected]> Authored: Wed May 1 15:20:20 2013 -0700 Committer: Josh Wills <[email protected]> Committed: Wed May 1 15:20:41 2013 -0700 ---------------------------------------------------------------------- crunch-core/pom.xml | 9 + .../io/avro/trevni/TrevniFileSourceTargetIT.java | 133 ++++++++++ .../crunch/io/avro/trevni/TrevniKeyPipelineIT.java | 195 +++++++++++++++ .../io/avro/trevni/TrevniFileReaderFactory.java | 106 ++++++++ .../crunch/io/avro/trevni/TrevniKeySource.java | 58 +++++ .../io/avro/trevni/TrevniKeySourceTarget.java | 40 +++ .../crunch/io/avro/trevni/TrevniKeyTarget.java | 146 +++++++++++ pom.xml | 18 ++- 8 files changed, 704 insertions(+), 1 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/5578d5e8/crunch-core/pom.xml ---------------------------------------------------------------------- diff --git a/crunch-core/pom.xml b/crunch-core/pom.xml index d365c3d..e4b6796 100644 --- a/crunch-core/pom.xml +++ b/crunch-core/pom.xml @@ -45,6 +45,15 @@ under the License. </dependency> <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>trevni-avro</artifactId> + </dependency> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>trevni-core</artifactId> + </dependency> + + <dependency> <groupId>org.javassist</groupId> <artifactId>javassist</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/crunch/blob/5578d5e8/crunch-core/src/it/java/org/apache/crunch/io/avro/trevni/TrevniFileSourceTargetIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/io/avro/trevni/TrevniFileSourceTargetIT.java b/crunch-core/src/it/java/org/apache/crunch/io/avro/trevni/TrevniFileSourceTargetIT.java new file mode 100644 index 0000000..d591c65 --- /dev/null +++ b/crunch-core/src/it/java/org/apache/crunch/io/avro/trevni/TrevniFileSourceTargetIT.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.avro.trevni; + +import com.google.common.collect.Lists; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData.Record; +import org.apache.avro.generic.GenericRecord; +import org.apache.crunch.PCollection; +import org.apache.crunch.Pipeline; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.test.Person; +import org.apache.crunch.test.StringWrapper; +import org.apache.crunch.test.TemporaryPath; +import org.apache.crunch.test.TemporaryPaths; +import org.apache.crunch.types.avro.AvroType; +import org.apache.crunch.types.avro.Avros; +import org.apache.hadoop.fs.Path; +import org.apache.trevni.ColumnFileMetaData; +import org.apache.trevni.avro.AvroColumnWriter; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +@SuppressWarnings("serial") +public class TrevniFileSourceTargetIT implements Serializable { + + private transient File avroFile; + @Rule + public transient TemporaryPath tmpDir = TemporaryPaths.create(); + + @Before + public void setUp() throws IOException { + avroFile = tmpDir.getFile("test.avro.trevni"); + } + + private void populateGenericFile(List<GenericRecord> genericRecords, Schema schema) throws IOException { + ColumnFileMetaData cfmd = new ColumnFileMetaData(); + AvroColumnWriter writer = new AvroColumnWriter(schema, cfmd); + + for (GenericRecord record : genericRecords) { + writer.write(record); + } + + writer.writeTo(avroFile); + } + + @Test + public void testSpecific() throws IOException { + GenericRecord savedRecord = new Record(Person.SCHEMA$); + savedRecord.put("name", "John Doe"); + savedRecord.put("age", 42); + savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane")); + populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$); + + Pipeline pipeline = new MRPipeline(TrevniFileSourceTargetIT.class, tmpDir.getDefaultConfiguration()); + PCollection<Person> genericCollection = pipeline.read(new TrevniKeySource(new Path(avroFile.getAbsolutePath()), + Avros.records(Person.class))); + + List<Person> personList = Lists.newArrayList(genericCollection.materialize()); + + Person expectedPerson = new Person(); + expectedPerson.name = "John Doe"; + expectedPerson.age = 42; + + List<CharSequence> siblingNames = Lists.newArrayList(); + siblingNames.add("Jimmy"); + siblingNames.add("Jane"); + expectedPerson.siblingnames = siblingNames; + + assertEquals(Lists.newArrayList(expectedPerson), Lists.newArrayList(personList)); + } + + @Test + public void testGeneric() throws IOException { + String genericSchemaJson = Person.SCHEMA$.toString().replace("Person", "GenericPerson"); + Schema genericPersonSchema = new Schema.Parser().parse(genericSchemaJson); + GenericRecord savedRecord = new Record(genericPersonSchema); + savedRecord.put("name", "John Doe"); + savedRecord.put("age", 42); + savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane")); + populateGenericFile(Lists.newArrayList(savedRecord), genericPersonSchema); + + Pipeline pipeline = new MRPipeline(TrevniFileSourceTargetIT.class, tmpDir.getDefaultConfiguration()); + PCollection<Record> genericCollection = pipeline.read(new TrevniKeySource(new Path(avroFile.getAbsolutePath()), + Avros.generics(genericPersonSchema))); + + List<Record> recordList = Lists.newArrayList(genericCollection.materialize()); + + assertEquals(Lists.newArrayList(savedRecord), Lists.newArrayList(recordList)); + } + + @Test + public void testReflect() throws IOException { + AvroType<StringWrapper> strType = Avros.reflects (StringWrapper.class); + Schema schema = strType.getSchema(); + GenericRecord savedRecord = new Record(schema); + savedRecord.put("value", "stringvalue"); + populateGenericFile(Lists.newArrayList(savedRecord), schema); + + Pipeline pipeline = new MRPipeline(TrevniFileSourceTargetIT.class, tmpDir.getDefaultConfiguration()); + PCollection<StringWrapper> stringValueCollection = pipeline.read(new TrevniKeySource(new Path(avroFile.getAbsolutePath()), + strType)); + + List<StringWrapper> recordList = Lists.newArrayList(stringValueCollection.materialize()); + + assertEquals(1, recordList.size()); + StringWrapper stringWrapper = recordList.get(0); + assertEquals("stringvalue", stringWrapper.getValue()); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/5578d5e8/crunch-core/src/it/java/org/apache/crunch/io/avro/trevni/TrevniKeyPipelineIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/io/avro/trevni/TrevniKeyPipelineIT.java b/crunch-core/src/it/java/org/apache/crunch/io/avro/trevni/TrevniKeyPipelineIT.java new file mode 100644 index 0000000..cd7fe0b --- /dev/null +++ b/crunch-core/src/it/java/org/apache/crunch/io/avro/trevni/TrevniKeyPipelineIT.java @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.crunch.io.avro.trevni; + +import com.google.common.collect.Lists; +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.specific.SpecificData; +import org.apache.crunch.PCollection; +import org.apache.crunch.Pipeline; +import org.apache.crunch.Target; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.io.At; +import org.apache.crunch.test.Person; +import org.apache.crunch.test.TemporaryPath; +import org.apache.crunch.test.TemporaryPaths; +import org.apache.crunch.types.avro.Avros; +import org.apache.hadoop.fs.Path; +import org.apache.trevni.avro.AvroColumnReader; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.util.List; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; + +public class TrevniKeyPipelineIT implements Serializable { + + private transient File avroFile; + @Rule + public transient TemporaryPath tmpDir = TemporaryPaths.create(); + + @Before + public void setUp() throws IOException { + avroFile = tmpDir.getFile("test.avro.trevni"); + } + + private void populateGenericFile(List<GenericRecord> genericRecords, Schema schema) throws IOException { + FileOutputStream outputStream = new FileOutputStream(this.avroFile); + GenericDatumWriter<GenericRecord> genericDatumWriter = new GenericDatumWriter<GenericRecord>(schema); + + DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(genericDatumWriter); + dataFileWriter.create(schema, outputStream); + + for (GenericRecord record : genericRecords) { + dataFileWriter.append(record); + } + + dataFileWriter.close(); + outputStream.close(); + } + + @Test + public void toAvroTrevniKeyTarget() throws Exception { + GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$); + savedRecord.put("name", "John Doe"); + savedRecord.put("age", 42); + savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane")); + populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$); + + Pipeline pipeline = new MRPipeline(TrevniKeyPipelineIT.class, tmpDir.getDefaultConfiguration()); + PCollection<Person> genericCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(), + Avros.records(Person.class))); + File outputFile = tmpDir.getFile("output"); + Target trevniFile = new TrevniKeyTarget(outputFile.getAbsolutePath()); + pipeline.write(genericCollection, trevniFile); + pipeline.run(); + + Person person = genericCollection.materialize().iterator().next(); + + File trvFile = new File(outputFile, "part-m-00000-part-0.trv"); + + AvroColumnReader.Params params = new AvroColumnReader.Params(trvFile); + params.setSchema(Person.SCHEMA$); + params.setModel(SpecificData.get()); + AvroColumnReader<Person> reader = new AvroColumnReader<Person>(params); + + try{ + Person readPerson = reader.next(); + assertThat(readPerson, is(person)); + }finally{ + reader.close(); + } + } + + @Test + public void toAvroTrevniKeyMultipleTarget() throws Exception { + GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$); + savedRecord.put("name", "John Doe"); + savedRecord.put("age", 42); + savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane")); + populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$); + + Pipeline pipeline = new MRPipeline(TrevniKeyPipelineIT.class, tmpDir.getDefaultConfiguration()); + PCollection<Person> genericCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(), + Avros.records(Person.class))); + File output1File = tmpDir.getFile("output1"); + File output2File = tmpDir.getFile("output2"); + pipeline.write(genericCollection, new TrevniKeyTarget(output1File.getAbsolutePath())); + pipeline.write(genericCollection, new TrevniKeyTarget(output2File.getAbsolutePath())); + pipeline.run(); + + Person person = genericCollection.materialize().iterator().next(); + + File trv1File = new File(output1File, "part-m-00000-part-0.trv"); + File trv2File = new File(output2File, "part-m-00000-part-0.trv"); + + AvroColumnReader.Params params = new AvroColumnReader.Params(trv1File); + params.setSchema(Person.SCHEMA$); + params.setModel(SpecificData.get()); + AvroColumnReader<Person> reader = new AvroColumnReader<Person>(params); + + try{ + Person readPerson = reader.next(); + assertThat(readPerson, is(person)); + }finally{ + reader.close(); + } + + params = new AvroColumnReader.Params(trv2File); + params.setSchema(Person.SCHEMA$); + params.setModel(SpecificData.get()); + reader = new AvroColumnReader<Person>(params); + + try{ + Person readPerson = reader.next(); + assertThat(readPerson, is(person)); + }finally{ + reader.close(); + } + } + + @Test + public void toAvroTrevniKeyTargetReadSource() throws Exception { + GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$); + savedRecord.put("name", "John Doe"); + savedRecord.put("age", 42); + savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane")); + populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$); + + Pipeline pipeline = new MRPipeline(TrevniKeyPipelineIT.class, tmpDir.getDefaultConfiguration()); + PCollection<Person> genericCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(), + Avros.records(Person.class))); + File outputFile = tmpDir.getFile("output"); + Target trevniFile = new TrevniKeyTarget(outputFile.getAbsolutePath()); + pipeline.write(genericCollection, trevniFile); + pipeline.run(); + + Person person = genericCollection.materialize().iterator().next(); + + PCollection<Person> retrievedPeople = pipeline.read(new TrevniKeySource<Person>( + new Path(outputFile.toURI()), Avros.records(Person.class))); + + Person retrievedPerson = retrievedPeople.materialize().iterator().next(); + + assertThat(retrievedPerson, is(person)); + + File trvFile = new File(outputFile, "part-m-00000-part-0.trv"); + + AvroColumnReader.Params params = new AvroColumnReader.Params(trvFile); + params.setSchema(Person.SCHEMA$); + params.setModel(SpecificData.get()); + AvroColumnReader<Person> reader = new AvroColumnReader<Person>(params); + + try{ + Person readPerson = reader.next(); + assertThat(readPerson, is(person)); + }finally{ + reader.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/5578d5e8/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniFileReaderFactory.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniFileReaderFactory.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniFileReaderFactory.java new file mode 100644 index 0000000..15bf7c1 --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniFileReaderFactory.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.avro.trevni; + +import com.google.common.collect.Iterators; +import com.google.common.collect.UnmodifiableIterator; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.reflect.ReflectData; +import org.apache.avro.specific.SpecificData; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.crunch.CrunchRuntimeException; +import org.apache.crunch.MapFn; +import org.apache.crunch.fn.IdentityFn; +import org.apache.crunch.io.FileReaderFactory; +import org.apache.crunch.io.impl.AutoClosingIterator; +import org.apache.crunch.types.avro.AvroType; +import org.apache.crunch.types.avro.Avros; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.trevni.Input; +import org.apache.trevni.avro.AvroColumnReader; +import org.apache.trevni.avro.HadoopInput; + +import java.io.IOException; +import java.util.Iterator; + +public class TrevniFileReaderFactory<T> implements FileReaderFactory<T> { + + private static final Log LOG = LogFactory.getLog(TrevniFileReaderFactory.class); + private final AvroType<T> aType; + private final MapFn<T, T> mapFn; + private final Schema schema; + + public TrevniFileReaderFactory(AvroType<T> atype) { + this.aType = atype; + schema = atype.getSchema(); + this.mapFn = (MapFn<T, T>) atype.getInputMapFn(); + } + + public TrevniFileReaderFactory(Schema schema) { + this.aType = null; + this.schema = schema; + this.mapFn = IdentityFn.<T>getInstance(); + } + + static <T> AvroColumnReader<T> getReader(Input input, AvroType<T> avroType, Schema schema) { + AvroColumnReader.Params params = new AvroColumnReader.Params(input); + params.setSchema(schema); + if (avroType.hasReflect()) { + if (avroType.hasSpecific()) { + Avros.checkCombiningSpecificAndReflectionSchemas(); + } + params.setModel(ReflectData.get()); + } else if (avroType.hasSpecific()) { + params.setModel(SpecificData.get()); + } else { + params.setModel(GenericData.get()); + } + + try { + return new AvroColumnReader<T>(params); + } catch (IOException e) { + throw new CrunchRuntimeException(e); + } + } + + @Override + public Iterator<T> read(FileSystem fs, final Path path) { + this.mapFn.initialize(); + try { + HadoopInput input = new HadoopInput(path, fs.getConf()); + final AvroColumnReader<T> reader = getReader(input, aType, schema); + return new AutoClosingIterator<T>(reader, new UnmodifiableIterator<T>() { + @Override + public boolean hasNext() { + return reader.hasNext(); + } + + @Override + public T next() { + return mapFn.map(reader.next()); + } + }); + } catch (IOException e) { + LOG.info("Could not read avro file at path: " + path, e); + return Iterators.emptyIterator(); + } + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/5578d5e8/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeySource.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeySource.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeySource.java new file mode 100644 index 0000000..193ac1b --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeySource.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.avro.trevni; + +import org.apache.avro.mapred.AvroJob; +import org.apache.crunch.io.CompositePathIterable; +import org.apache.crunch.io.FormatBundle; +import org.apache.crunch.io.ReadableSource; +import org.apache.crunch.io.impl.FileSourceImpl; +import org.apache.crunch.types.avro.AvroType; +import org.apache.crunch.types.avro.Avros; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.trevni.avro.mapreduce.AvroTrevniKeyInputFormat; + +import java.io.IOException; + +public class TrevniKeySource<T> extends FileSourceImpl<T> implements ReadableSource<T> { + + private static <S> FormatBundle getBundle(AvroType<S> ptype) { + FormatBundle bundle = FormatBundle.forInput(AvroTrevniKeyInputFormat.class) + .set(AvroJob.INPUT_IS_REFLECT, String.valueOf(ptype.hasReflect())) + .set(AvroJob.INPUT_SCHEMA, ptype.getSchema().toString()) + .set(Avros.REFLECT_DATA_FACTORY_CLASS, Avros.REFLECT_DATA_FACTORY.getClass().getName()); + return bundle; + } + + public TrevniKeySource(Path path, AvroType<T> ptype) { + super(path, ptype, getBundle(ptype)); + } + + @Override + public String toString() { + return "TrevniKey(" + path.toString() + ")"; + } + + @Override + public Iterable<T> read(Configuration conf) throws IOException { + FileSystem fs = path.getFileSystem(conf); + return CompositePathIterable.create(fs, path, new TrevniFileReaderFactory<T>((AvroType<T>) ptype)); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/5578d5e8/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeySourceTarget.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeySourceTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeySourceTarget.java new file mode 100644 index 0000000..72a0fd3 --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeySourceTarget.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.avro.trevni; + +import org.apache.crunch.io.FileNamingScheme; +import org.apache.crunch.io.SequentialFileNamingScheme; +import org.apache.crunch.io.avro.AvroFileTarget; +import org.apache.crunch.io.impl.ReadableSourcePathTargetImpl; +import org.apache.crunch.types.avro.AvroType; +import org.apache.hadoop.fs.Path; + +public class TrevniKeySourceTarget<T> extends ReadableSourcePathTargetImpl<T> { + public TrevniKeySourceTarget(Path path, AvroType<T> atype) { + this(path, atype, new SequentialFileNamingScheme()); + } + + public TrevniKeySourceTarget(Path path, AvroType<T> atype, FileNamingScheme fileNamingScheme) { + super(new TrevniKeySource(path, atype), new AvroFileTarget(path), fileNamingScheme); + } + + @Override + public String toString() { + return target.toString(); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/5578d5e8/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeyTarget.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeyTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeyTarget.java new file mode 100644 index 0000000..555aaf4 --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeyTarget.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.avro.trevni; + +import org.apache.avro.hadoop.io.AvroKeyComparator; +import org.apache.avro.hadoop.io.AvroSerialization; +import org.apache.avro.mapred.AvroKey; +import org.apache.avro.mapreduce.AvroJob; +import org.apache.crunch.SourceTarget; +import org.apache.crunch.impl.mr.plan.PlanningParameters; +import org.apache.crunch.io.CrunchOutputs; +import org.apache.crunch.io.FileNamingScheme; +import org.apache.crunch.io.FormatBundle; +import org.apache.crunch.io.OutputHandler; +import org.apache.crunch.io.SequentialFileNamingScheme; +import org.apache.crunch.io.impl.FileTargetImpl; +import org.apache.crunch.types.PType; +import org.apache.crunch.types.avro.AvroType; +import org.apache.crunch.types.avro.Avros; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.util.StringUtils; +import org.apache.trevni.avro.mapreduce.AvroTrevniKeyOutputFormat; + +import java.io.IOException; +import java.util.Collection; + +import static org.apache.crunch.types.avro.Avros.REFLECT_DATA_FACTORY; +import static org.apache.crunch.types.avro.Avros.REFLECT_DATA_FACTORY_CLASS; + +public class TrevniKeyTarget extends FileTargetImpl { + + public TrevniKeyTarget(String path) { + this(new Path(path)); + } + + public TrevniKeyTarget(Path path) { + this(path, new SequentialFileNamingScheme()); + } + + public TrevniKeyTarget(Path path, FileNamingScheme fileNamingScheme) { + super(path, AvroTrevniKeyOutputFormat.class, fileNamingScheme); + } + + @Override + public String toString() { + return "TrevniKey(" + path.toString() + ")"; + } + + @Override + public boolean accept(OutputHandler handler, PType<?> ptype) { + if (!(ptype instanceof AvroType)) { + return false; + } + handler.configure(this, ptype); + return true; + } + + @Override + public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) { + AvroType<?> atype = (AvroType<?>) ptype; + Configuration conf = job.getConfiguration(); + + if (null == name) { + AvroJob.setOutputKeySchema(job, atype.getSchema()); + AvroJob.setMapOutputKeySchema(job, atype.getSchema()); + + Avros.configureReflectDataFactory(conf); + configureForMapReduce(job, AvroKey.class, NullWritable.class, AvroTrevniKeyOutputFormat.class, + outputPath, name); + } else { + FormatBundle<AvroTrevniKeyOutputFormat> bundle = FormatBundle.forOutput( + AvroTrevniKeyOutputFormat.class); + + bundle.set("avro.schema.output.key", atype.getSchema().toString()); + bundle.set("mapred.output.value.groupfn.class", AvroKeyComparator.class.getName()); + bundle.set("mapred.output.key.comparator.class", AvroKeyComparator.class.getName()); + bundle.set("avro.serialization.key.writer.schema", atype.getSchema().toString()); + bundle.set("avro.serialization.key.reader.schema", atype.getSchema().toString()); + + //Equivalent to... + // AvroSerialization.addToConfiguration(job.getConfiguration()); + Collection<String> serializations = conf.getStringCollection("io.serializations"); + if (!serializations.contains(AvroSerialization.class.getName())) { + serializations.add(AvroSerialization.class.getName()); + bundle.set(name, StringUtils.arrayToString(serializations.toArray(new String[serializations.size()]))); + } + + //The following is equivalent to Avros.configureReflectDataFactory(conf); + bundle.set(REFLECT_DATA_FACTORY_CLASS, REFLECT_DATA_FACTORY.getClass().getName()); + + //Set output which honors the name. + bundle.set("mapred.output.dir", new Path(outputPath, name).toString()); + + //Set value which will be ignored but should get past the FileOutputFormat.checkOutputSpecs(..) + //which requires the "mapred.output.dir" value to be set. + FileOutputFormat.setOutputPath(job, outputPath); + + CrunchOutputs.addNamedOutput(job, name, + bundle, + AvroKey.class, + NullWritable.class); + } + } + + @Override + protected Path getSourcePattern(final Path workingPath, final int index) { + //output directories are typically of the form + //out#/part-m-#####/part-m-#####/part-#.trv but we don't want both of those folders because it isn't + //readable by the TrevniKeySource. + return new Path(workingPath, PlanningParameters.MULTI_OUTPUT_PREFIX + index + "*/part-*/part-*"); + } + + @Override + protected Path getDestFile(final Configuration conf, final Path src, final Path dir, final boolean mapOnlyJob) throws IOException { + Path outputFilename = super.getDestFile(conf, src, dir, mapOnlyJob); + //make sure the dst file is unique in the case there are multiple part-#.trv files per partition. + return new Path(outputFilename.toString()+"-"+src.getName()); + } + + @Override + public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) { + if (ptype instanceof AvroType) { + return new TrevniKeySourceTarget(path, (AvroType<T>) ptype); + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/5578d5e8/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 71f5e0f..acbf66f 100644 --- a/pom.xml +++ b/pom.xml @@ -72,7 +72,7 @@ under the License. <commons-httpclient.version>3.0.1</commons-httpclient.version> <commons-logging.version>1.1.1</commons-logging.version> <commons-cli.version>1.2</commons-cli.version> - <avro.version>1.7.0</avro.version> + <avro.version>1.7.4</avro.version> <javassist.version>3.16.1-GA</javassist.version> <jackson.version>1.8.8</jackson.version> <protobuf-java.version>2.3.0</protobuf-java.version> @@ -191,6 +191,22 @@ under the License. </exclusion> </exclusions> </dependency> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>trevni-core</artifactId> + <version>${avro.version}</version> + </dependency> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>trevni-avro</artifactId> + <version>${avro.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-core</artifactId> + </exclusion> + </exclusions> + </dependency> <dependency> <groupId>org.javassist</groupId>
