Author: daijy
Date: Wed Mar 11 17:16:32 2015
New Revision: 1665946
URL: http://svn.apache.org/r1665946
Log:
PIG-4448: AvroMapWrapper leaks Avro data types when the map values are complex
avro records
Added:
pig/trunk/test/org/apache/pig/builtin/avro/schema/recordInMap.avsc
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/impl/util/avro/AvroBagWrapper.java
pig/trunk/src/org/apache/pig/impl/util/avro/AvroMapWrapper.java
pig/trunk/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java
pig/trunk/test/org/apache/pig/impl/util/avro/TestAvroStorageSchemaConversionUtilities.java
Modified: pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1665946&r1=1665945&r2=1665946&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Mar 11 17:16:32 2015
@@ -54,6 +54,8 @@ PIG-4333: Split BigData tests into multi
BUG FIXES
+PIG-4448: AvroMapWrapper leaks Avro data types when the map values are complex
avro records (rdsr via daijy)
+
PIG-4453: Remove test-tez-local target (daijy)
PIG-4443: Write inputsplits in Tez to disk if the size is huge and option to
compress pig input splits (rohini)
Modified: pig/trunk/src/org/apache/pig/impl/util/avro/AvroBagWrapper.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/avro/AvroBagWrapper.java?rev=1665946&r1=1665945&r2=1665946&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/avro/AvroBagWrapper.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/avro/AvroBagWrapper.java Wed Mar 11
17:16:32 2015
@@ -99,7 +99,7 @@ public final class AvroBagWrapper<T> imp
if (arg instanceof IndexedRecord) {
return new AvroTupleWrapper<IndexedRecord>((IndexedRecord)
arg);
} else {
- return
TupleFactory.getInstance().newTuple(AvroTupleWrapper.unionResolver(arg));
+ return
TupleFactory.getInstance().newTuple(AvroTupleWrapper.getPigObject(arg));
}
}
}
Modified: pig/trunk/src/org/apache/pig/impl/util/avro/AvroMapWrapper.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/avro/AvroMapWrapper.java?rev=1665946&r1=1665945&r2=1665946&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/avro/AvroMapWrapper.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/avro/AvroMapWrapper.java Wed Mar 11
17:16:32 2015
@@ -81,12 +81,7 @@ public final class AvroMapWrapper implem
} else {
v = innerMap.get(key);
}
-
- if (v instanceof Utf8) {
- return v.toString();
- } else {
- return v;
- }
+ return AvroTupleWrapper.getPigObject(v);
}
@Override
Modified: pig/trunk/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java?rev=1665946&r1=1665945&r2=1665946&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java Wed Mar
11 17:16:32 2015
@@ -130,13 +130,17 @@ public final class AvroTupleWrapper <T e
case BYTES:
return new DataByteArray(((ByteBuffer) o).array());
case UNION:
- return unionResolver(o);
+ return getPigObject(o);
default:
return o;
}
}
- public static Object unionResolver(Object o) {
+ /**
+ * @param o An Avro object to convert to an equivalent type in Pig
+ * @return Equivalent Pig object
+ */
+ public static Object getPigObject(Object o) {
if (o instanceof org.apache.avro.util.Utf8) {
return o.toString();
} else if (o instanceof IndexedRecord) {
Added: pig/trunk/test/org/apache/pig/builtin/avro/schema/recordInMap.avsc
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/avro/schema/recordInMap.avsc?rev=1665946&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/avro/schema/recordInMap.avsc (added)
+++ pig/trunk/test/org/apache/pig/builtin/avro/schema/recordInMap.avsc Wed Mar
11 17:16:32 2015
@@ -0,0 +1,28 @@
+{
+ "name" : "recordInMap",
+ "namespace" : "org.apache.pig.test.builtin",
+ "type" : "record",
+ "fields" : [
+ {"name" : "key", "type" : "string"},
+ {"name" : "value", "type" : "int"},
+ {
+ "name" : "parameters",
+ "type": {
+ "type": "map",
+ "values": {
+ "type": "record",
+ "name": "A",
+ "fields": [
+ {
+ "name": "id",
+ "type": [
+ "null",
+ "int"
+ ]
+ }
+ ]
+ }
+ }
+ }
+ ]
+}
Modified:
pig/trunk/test/org/apache/pig/impl/util/avro/TestAvroStorageSchemaConversionUtilities.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/impl/util/avro/TestAvroStorageSchemaConversionUtilities.java?rev=1665946&r1=1665945&r2=1665946&view=diff
==============================================================================
---
pig/trunk/test/org/apache/pig/impl/util/avro/TestAvroStorageSchemaConversionUtilities.java
(original)
+++
pig/trunk/test/org/apache/pig/impl/util/avro/TestAvroStorageSchemaConversionUtilities.java
Wed Mar 11 17:16:32 2015
@@ -17,14 +17,18 @@
*/
package org.apache.pig.impl.util.avro;
+import com.google.common.collect.ImmutableMap;
import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
import org.apache.pig.ResourceSchema;
+import org.apache.pig.data.Tuple;
import org.junit.Assert;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
+import java.util.Map;
public class TestAvroStorageSchemaConversionUtilities {
final private static String BASE_DIR =
"test/org/apache/pig/builtin/avro/schema/";
@@ -43,6 +47,32 @@ public class TestAvroStorageSchemaConver
parse(BASE_DIR + "nullableArrayInMap.avsc", true));
}
+ /**
+ * Test verifies that Avro records as map values are correctly converted
to tuples
+ */
+ @Test
+ public void testRecordAsMapValue() throws IOException {
+ final Schema schema = new Schema.Parser().parse(new File(BASE_DIR,
"recordInMap.avsc"));
+ final GenericData.Record record = new GenericData.Record(schema);
+ record.put("key", "k");
+ record.put("value", 42);
+ final Schema valueSchema =
schema.getField("parameters").schema().getValueType();
+ final GenericData.Record valueRecord = new
GenericData.Record(valueSchema);
+ valueRecord.put("id", 1);
+ record.put("parameters", ImmutableMap.of("record_in_map",
valueRecord));
+ final Tuple tuple = new AvroTupleWrapper<GenericData.Record>(record);
+ // Third parameter is the map
+ final Object o = tuple.get(2);
+ Assert.assertTrue(o instanceof Map);
+ final Map<CharSequence, Object> map = (Map<CharSequence, Object>) o;
+ final Object recordInMap = map.get("record_in_map");
+ // Assert that the record got converted to a tuple
+ Assert.assertTrue(recordInMap instanceof Tuple);
+ final Tuple tuple2 = (Tuple) recordInMap;
+ final Object id = tuple2.get(0);
+ Assert.assertEquals(1, id);
+ }
+
private static String parse(String schema, boolean recursive) throws
IOException {
final Schema s = new Schema.Parser().parse(new File(schema));
final ResourceSchema resourceSchema =
AvroStorageSchemaConversionUtilities.avroSchemaToResourceSchema(s, recursive);