CRUNCH-313: Copy the Configuration object used by CrunchInputSplit so it doesn't conflict with settings from the base Configuration.
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/f65176fa Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/f65176fa Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/f65176fa Branch: refs/heads/apache-crunch-0.8 Commit: f65176fa344b5c1d720ad20acf5fdcf899ccf741 Parents: 5d06fd4 Author: Josh Wills <[email protected]> Authored: Fri Dec 20 17:15:06 2013 -0800 Committer: Josh Wills <[email protected]> Committed: Fri Jan 3 17:38:41 2014 -0800 ---------------------------------------------------------------------- .../apache/crunch/io/avro/AvroPipelineIT.java | 29 ++++++++++++++++++++ .../crunch/impl/mr/run/CrunchInputSplit.java | 4 +-- 2 files changed, 31 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/f65176fa/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroPipelineIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroPipelineIT.java b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroPipelineIT.java index 29bf4f5..9eba070 100644 --- a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroPipelineIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroPipelineIT.java @@ -16,6 +16,7 @@ */ package org.apache.crunch.io.avro; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.io.File; @@ -30,13 +31,17 @@ import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.apache.commons.io.FileUtils; +import org.apache.crunch.MapFn; import org.apache.crunch.PCollection; +import org.apache.crunch.PTable; +import org.apache.crunch.Pair; import org.apache.crunch.Pipeline; import org.apache.crunch.Target; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.io.At; import org.apache.crunch.io.To; import org.apache.crunch.test.Person; +import org.apache.crunch.test.StringWrapper; import org.apache.crunch.test.TemporaryPath; import org.apache.crunch.test.TemporaryPaths; import org.apache.crunch.types.avro.Avros; @@ -92,4 +97,28 @@ public class AvroPipelineIT implements Serializable { String outputString = FileUtils.readFileToString(new File(outputFile, "part-m-00000")); assertTrue(outputString.contains(person.toString())); } + + @Test + public void genericWithReflection() throws Exception { + GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$); + savedRecord.put("name", "John Doe"); + savedRecord.put("age", 42); + savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane")); + populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$); + + Pipeline pipeline = new MRPipeline(AvroFileSourceTargetIT.class, tmpDir.getDefaultConfiguration()); + PCollection<Person> genericCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(), + Avros.records(Person.class))); + PTable<Long, StringWrapper> pt = genericCollection.parallelDo(new MapFn<Person, Pair<Long, StringWrapper>>() { + @Override + public Pair<Long, StringWrapper> map(Person input) { + return Pair.of(1L, new StringWrapper(input.getName().toString())); + } + }, Avros.tableOf(Avros.longs(), Avros.reflects(StringWrapper.class))) + .groupByKey() + .ungroup(); + List<Pair<Long, StringWrapper>> ret = Lists.newArrayList(pt.materialize()); + pipeline.done(); + assertEquals(1, ret.size()); + } } http://git-wip-us.apache.org/repos/asf/crunch/blob/f65176fa/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java index bda6f1a..02942bc 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java @@ -54,12 +54,12 @@ class CrunchInputSplit extends InputSplit implements Writable, Configurable { this.inputSplit = inputSplit; this.bundle = bundle; this.nodeIndex = nodeIndex; - this.conf = conf; + this.conf = new Configuration(conf); } @Override public void setConf(Configuration conf) { - this.conf = conf; + this.conf = new Configuration(conf); if (bundle != null && conf != null) { this.bundle.configure(conf); }
