Repository: crunch Updated Branches: refs/heads/master e3141009a -> 48e4e7941
CRUNCH-485: Support Avro-related byte equality etc. checks inside of crunch-spark Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/48e4e794 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/48e4e794 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/48e4e794 Branch: refs/heads/master Commit: 48e4e79412b83013bea3e8a7723d4a7917f6ce49 Parents: e314100 Author: Josh Wills <[email protected]> Authored: Fri Jan 9 13:22:27 2015 -0800 Committer: Josh Wills <[email protected]> Committed: Fri Jan 9 13:22:27 2015 -0800 ---------------------------------------------------------------------- .../org/apache/crunch/impl/spark/ByteArray.java | 14 ++-- .../crunch/impl/spark/ByteArrayHelper.java | 84 ++++++++++++++++++++ .../apache/crunch/impl/spark/IntByteArray.java | 8 +- .../crunch/impl/spark/fn/MapOutputFunction.java | 4 +- .../spark/fn/PartitionedMapOutputFunction.java | 2 +- .../crunch/impl/spark/serde/AvroSerDe.java | 8 +- .../apache/crunch/impl/spark/serde/SerDe.java | 3 +- .../crunch/impl/spark/serde/WritableSerDe.java | 6 +- .../crunch/impl/spark/AvroByteArrayTest.java | 55 +++++++++++++ 9 files changed, 163 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/48e4e794/crunch-spark/src/main/java/org/apache/crunch/impl/spark/ByteArray.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/ByteArray.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/ByteArray.java index c86835c..db9e3c9 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/ByteArray.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/ByteArray.java @@ -17,34 +17,32 @@ */ package org.apache.crunch.impl.spark; -import com.google.common.primitives.UnsignedBytes; - import java.io.Serializable; -import java.util.Arrays; public class ByteArray implements Serializable, Comparable<ByteArray> { public final byte[] value; + protected final ByteArrayHelper helper; - public ByteArray(byte[] value) { + public ByteArray(byte[] value, ByteArrayHelper helper) { this.value = value; + this.helper = helper; } @Override public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) return false; ByteArray byteArray = (ByteArray) o; - if (!Arrays.equals(value, byteArray.value)) return false; - return true; + return helper.equal(value, byteArray.value); } @Override public int hashCode() { - return value != null ? Arrays.hashCode(value) : 0; + return helper.hashCode(value); } @Override public int compareTo(ByteArray other) { - return UnsignedBytes.lexicographicalComparator().compare(value, other.value); + return helper.compare(value, other.value); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/48e4e794/crunch-spark/src/main/java/org/apache/crunch/impl/spark/ByteArrayHelper.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/ByteArrayHelper.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/ByteArrayHelper.java new file mode 100644 index 0000000..a87cb66 --- /dev/null +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/ByteArrayHelper.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.spark; + +import com.google.common.primitives.UnsignedBytes; +import org.apache.avro.Schema; +import org.apache.avro.io.BinaryData; + +import java.io.Serializable; +import java.util.Arrays; + +public abstract class ByteArrayHelper implements Serializable { + + public static final ByteArrayHelper WRITABLES = new ByteArrayHelper() { + @Override + boolean equal(byte[] left, byte[] right) { + return Arrays.equals(left, right); + } + + @Override + int hashCode(byte[] value) { + return value != null ? Arrays.hashCode(value) : 0; + } + + @Override + int compare(byte[] left, byte[] right) { + return UnsignedBytes.lexicographicalComparator().compare(left, right); + } + }; + + public static ByteArrayHelper forAvroSchema(Schema schema) { + return new AvroByteArrayHelper(schema); + } + + abstract boolean equal(byte[] left, byte[] right); + abstract int hashCode(byte[] value); + abstract int compare(byte[] left, byte[] right); + + static class AvroByteArrayHelper extends ByteArrayHelper { + private String jsonSchema; + private transient Schema schema; + + public AvroByteArrayHelper(Schema schema) { + this.jsonSchema = schema.toString(); + } + + private Schema getSchema() { + if (schema == null) { + schema = new Schema.Parser().parse(jsonSchema); + } + return schema; + } + + @Override + boolean equal(byte[] left, byte[] right) { + return compare(left, right) == 0; + } + + @Override + int hashCode(byte[] value) { + return BinaryData.hashCode(value, 0, value.length, getSchema()); + } + + @Override + int compare(byte[] left, byte[] right) { + return BinaryData.compare(left, 0, right, 0, getSchema()); + } + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/48e4e794/crunch-spark/src/main/java/org/apache/crunch/impl/spark/IntByteArray.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/IntByteArray.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/IntByteArray.java index 9af70ed..dfb9ed6 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/IntByteArray.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/IntByteArray.java @@ -17,13 +17,11 @@ */ package org.apache.crunch.impl.spark; -import java.io.Serializable; - -public class IntByteArray extends ByteArray implements Serializable { +public class IntByteArray extends ByteArray { public final int partition; - public IntByteArray(int partition, byte[] bytes) { - super(bytes); + public IntByteArray(int partition, ByteArray delegate) { + super(delegate.value, delegate.helper); this.partition = partition; } http://git-wip-us.apache.org/repos/asf/crunch/blob/48e4e794/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/MapOutputFunction.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/MapOutputFunction.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/MapOutputFunction.java index b8cd7c6..080806c 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/MapOutputFunction.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/MapOutputFunction.java @@ -36,7 +36,7 @@ public class MapOutputFunction<K, V> implements PairFunction<Pair<K, V>, ByteArr @Override public Tuple2<ByteArray, byte[]> call(Pair<K, V> p) throws Exception { return new Tuple2<ByteArray, byte[]>( - new ByteArray(keySerde.toBytes(p.first())), - valueSerde.toBytes(p.second())); + keySerde.toBytes(p.first()), + valueSerde.toBytes(p.second()).value); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/48e4e794/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PartitionedMapOutputFunction.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PartitionedMapOutputFunction.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PartitionedMapOutputFunction.java index e88217d..e104b27 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PartitionedMapOutputFunction.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PartitionedMapOutputFunction.java @@ -64,7 +64,7 @@ public class PartitionedMapOutputFunction<K, V> implements PairFunction<Pair<K, int partition = getPartitioner().getPartition(p.first(), p.second(), numPartitions); return new Tuple2<IntByteArray, byte[]>( new IntByteArray(partition, keySerde.toBytes(p.first())), - valueSerde.toBytes(p.second())); + valueSerde.toBytes(p.second()).value); } private Partitioner getPartitioner() { http://git-wip-us.apache.org/repos/asf/crunch/blob/48e4e794/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/AvroSerDe.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/AvroSerDe.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/AvroSerDe.java index f82ba8e..cef60e3 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/AvroSerDe.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/AvroSerDe.java @@ -30,6 +30,8 @@ import org.apache.avro.reflect.ReflectDatumReader; import org.apache.avro.reflect.ReflectDatumWriter; import org.apache.avro.specific.SpecificDatumReader; import org.apache.avro.specific.SpecificDatumWriter; +import org.apache.crunch.impl.spark.ByteArray; +import org.apache.crunch.impl.spark.ByteArrayHelper; import org.apache.crunch.types.avro.AvroMode; import org.apache.crunch.types.avro.AvroType; import org.apache.crunch.types.avro.Avros; @@ -44,6 +46,7 @@ public class AvroSerDe<T> implements SerDe<T> { private AvroType<T> avroType; private Map<String, String> modeProperties; + private ByteArrayHelper helper; private transient AvroMode mode; private transient DatumWriter<T> writer; private transient DatumReader<T> reader; @@ -54,6 +57,7 @@ public class AvroSerDe<T> implements SerDe<T> { if (avroType.hasReflect() && avroType.hasSpecific()) { Avros.checkCombiningSpecificAndReflectionSchemas(); } + this.helper = ByteArrayHelper.forAvroSchema(avroType.getSchema()); } private AvroMode getMode() { @@ -85,13 +89,13 @@ public class AvroSerDe<T> implements SerDe<T> { } @Override - public byte[] toBytes(T obj) throws Exception { + public ByteArray toBytes(T obj) throws Exception { ByteArrayOutputStream out = new ByteArrayOutputStream(); Encoder encoder = EncoderFactory.get().binaryEncoder(out, null); getWriter().write(obj, encoder); encoder.flush(); out.close(); - return out.toByteArray(); + return new ByteArray(out.toByteArray(), helper); } @Override http://git-wip-us.apache.org/repos/asf/crunch/blob/48e4e794/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/SerDe.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/SerDe.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/SerDe.java index 887f656..354f348 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/SerDe.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/SerDe.java @@ -18,13 +18,14 @@ package org.apache.crunch.impl.spark.serde; import com.google.common.base.Function; +import org.apache.crunch.impl.spark.ByteArray; import org.apache.hadoop.conf.Configuration; import java.io.Serializable; public interface SerDe<T> extends Serializable { - byte[] toBytes(T obj) throws Exception; + ByteArray toBytes(T obj) throws Exception; T fromBytes(byte[] bytes); http://git-wip-us.apache.org/repos/asf/crunch/blob/48e4e794/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/WritableSerDe.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/WritableSerDe.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/WritableSerDe.java index d90007d..55f7404 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/WritableSerDe.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/WritableSerDe.java @@ -18,6 +18,8 @@ package org.apache.crunch.impl.spark.serde; import com.google.common.base.Function; +import org.apache.crunch.impl.spark.ByteArray; +import org.apache.crunch.impl.spark.ByteArrayHelper; import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.ReflectionUtils; @@ -37,12 +39,12 @@ public class WritableSerDe implements SerDe<Writable> { } @Override - public byte[] toBytes(Writable obj) throws Exception { + public ByteArray toBytes(Writable obj) throws Exception { ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(baos); obj.write(dos); dos.close(); - return baos.toByteArray(); + return new ByteArray(baos.toByteArray(), ByteArrayHelper.WRITABLES); } @Override http://git-wip-us.apache.org/repos/asf/crunch/blob/48e4e794/crunch-spark/src/test/java/org/apache/crunch/impl/spark/AvroByteArrayTest.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/test/java/org/apache/crunch/impl/spark/AvroByteArrayTest.java b/crunch-spark/src/test/java/org/apache/crunch/impl/spark/AvroByteArrayTest.java new file mode 100644 index 0000000..ec6dd94 --- /dev/null +++ b/crunch-spark/src/test/java/org/apache/crunch/impl/spark/AvroByteArrayTest.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.spark; + +import com.google.common.collect.Lists; +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.avro.Schema.Field.Order; +import org.apache.avro.Schema.Type; +import org.apache.avro.generic.GenericData.Record; +import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.crunch.impl.spark.serde.AvroSerDe; +import org.apache.crunch.types.avro.Avros; +import org.codehaus.jackson.node.JsonNodeFactory; +import org.junit.Test; +import static org.junit.Assert.assertEquals; + +public class AvroByteArrayTest { + @Test + public void fieldsWithIgnoredSortOrderAreNotUsedInEquals() throws Exception { + Schema mySchema = Schema.createRecord("foo", "", "", false); + mySchema.setFields(Lists.newArrayList(new Field("field1", + Schema.create(Type.STRING), + null, + JsonNodeFactory.instance.textNode(""), + Order.ASCENDING), new Field("field2", + Schema.create(Type.STRING), + null, + JsonNodeFactory.instance.textNode(""), + Order.IGNORE))); + + GenericRecordBuilder myGRB = new GenericRecordBuilder(mySchema); + Record myRecord1 = myGRB.set("field1", "hello").set("field2", "world").build(); + Record myRecord2 = myGRB.set("field1", "hello").set("field2", "there").build(); + assertEquals(myRecord1, myRecord2); + + AvroSerDe serde = new AvroSerDe(Avros.generics(mySchema), null); + assertEquals(serde.toBytes(myRecord1), serde.toBytes(myRecord2)); + } +}
