CRUNCH-16: Updates so that Avro 1.5.4 will work with the latest and greatest Crunch stuff
Signed-off-by: Josh Wills <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/b05f1836 Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/b05f1836 Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/b05f1836 Branch: refs/heads/master Commit: b05f1836cb4ee4c9bf34b202d57a536f251f82bc Parents: 6531106 Author: jwills <[email protected]> Authored: Sat Aug 18 14:52:19 2012 -0700 Committer: Josh Wills <[email protected]> Committed: Tue Aug 21 18:54:29 2012 -0700 ---------------------------------------------------------------------- .../scrunch/ScalaSafeReflectDatumReader.java | 3 +- .../src/it/java/org/apache/crunch/lib/SortIT.java | 1 + .../apache/crunch/lib/SpecificAvroGroupByIT.java | 16 ------ .../java/org/apache/crunch/types/avro/Avros.java | 43 ++++++++++++++- 4 files changed, 44 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/b05f1836/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumReader.java ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumReader.java b/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumReader.java index 75db945..80f265c 100644 --- a/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumReader.java +++ b/crunch-scrunch/src/main/java/org/apache/crunch/scrunch/ScalaSafeReflectDatumReader.java @@ -26,6 +26,7 @@ import java.util.Map; import org.apache.avro.Schema; import org.apache.avro.io.ResolvingDecoder; import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.hadoop.util.ReflectionUtils; import scala.collection.JavaConversions; @@ -111,7 +112,7 @@ public class ScalaSafeReflectDatumReader<T> extends ReflectDatumReader<T> { collectionClass.isAssignableFrom(ArrayList.class)) { return new ArrayList(); } - return data.newInstance(collectionClass, schema); + return ReflectionUtils.newInstance(collectionClass, null); } Class elementClass = ScalaSafeReflectData.getClassProp(schema, ScalaSafeReflectData.ELEMENT_PROP); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/b05f1836/crunch/src/it/java/org/apache/crunch/lib/SortIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/lib/SortIT.java b/crunch/src/it/java/org/apache/crunch/lib/SortIT.java index dc025a7..4a22a51 100644 --- a/crunch/src/it/java/org/apache/crunch/lib/SortIT.java +++ b/crunch/src/it/java/org/apache/crunch/lib/SortIT.java @@ -150,6 +150,7 @@ public class SortIT implements Serializable { @Test public void testAvroReflectSortPair() throws IOException { Pipeline pipeline = new MRPipeline(SortIT.class, tmpDir.getDefaultConfiguration()); + pipeline.enableDebug(); PCollection<Pair<String, StringWrapper>> sorted = pipeline.readTextFile(tmpDir.copyResourceFileName("set2.txt")) .parallelDo(new MapFn<String, Pair<String, StringWrapper>>() { http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/b05f1836/crunch/src/it/java/org/apache/crunch/lib/SpecificAvroGroupByIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/lib/SpecificAvroGroupByIT.java b/crunch/src/it/java/org/apache/crunch/lib/SpecificAvroGroupByIT.java index 7b61813..5167dec 100644 --- a/crunch/src/it/java/org/apache/crunch/lib/SpecificAvroGroupByIT.java +++ b/crunch/src/it/java/org/apache/crunch/lib/SpecificAvroGroupByIT.java @@ -26,7 +26,6 @@ import java.io.Serializable; import java.util.List; import org.apache.avro.file.DataFileWriter; -import org.apache.avro.mapred.AvroJob; import org.apache.avro.specific.SpecificDatumWriter; import org.apache.crunch.MapFn; import org.apache.crunch.PCollection; @@ -70,22 +69,7 @@ public class SpecificAvroGroupByIT implements Serializable { @Test public void testGrouByWithSpecificAvroType() throws Exception { - - MRPipeline pipeline = new MRPipeline(SpecificAvroGroupByIT.class, tmpDir.getDefaultConfiguration()); - - testSpecificAvro(pipeline); - } - - @Test - public void testGrouByOnSpecificAvroButReflectionDatumReader() throws Exception { MRPipeline pipeline = new MRPipeline(SpecificAvroGroupByIT.class, tmpDir.getDefaultConfiguration()); - - // https://issues.apache.org/jira/browse/AVRO-1046 resolves - // the ClassCastException when reading specific Avro types with - // ReflectDatumReader - - pipeline.getConfiguration().setBoolean(AvroJob.MAP_OUTPUT_IS_REFLECT, true); - testSpecificAvro(pipeline); } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/b05f1836/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java b/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java index 9f4a21d..24391ed 100644 --- a/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java +++ b/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java @@ -628,11 +628,50 @@ public class Avros { @Override public int hashCode() { - return ReflectData.get().hashCode(this, getSchema()); + return reflectAwareHashCode(this, getSchema()); } - } + /* + * TODO: Remove this once we no longer have to support 1.5.4. + */ + private static int reflectAwareHashCode(Object o, Schema s) { + if (o == null) return 0; // incomplete datum + int hashCode = 1; + switch (s.getType()) { + case RECORD: + for (Schema.Field f : s.getFields()) { + if (f.order() == Schema.Field.Order.IGNORE) + continue; + hashCode = hashCodeAdd(hashCode, + ReflectData.get().getField(o, f.name(), f.pos()), f.schema()); + } + return hashCode; + case ARRAY: + Collection<?> a = (Collection<?>)o; + Schema elementType = s.getElementType(); + for (Object e : a) + hashCode = hashCodeAdd(hashCode, e, elementType); + return hashCode; + case UNION: + return reflectAwareHashCode( + o, s.getTypes().get(ReflectData.get().resolveUnion(s, o))); + case ENUM: + return s.getEnumOrdinal(o.toString()); + case NULL: + return 0; + case STRING: + return (o instanceof Utf8 ? o : new Utf8(o.toString())).hashCode(); + default: + return o.hashCode(); + } + } + + /** Add the hash code for an object into an accumulated hash code. */ + private static int hashCodeAdd(int hashCode, Object o, Schema s) { + return 31*hashCode + reflectAwareHashCode(o, s); + } + private Avros() { } }
