Repository: nifi
Updated Branches:
  refs/heads/master 49285d325 -> 353fcdda9


NIFI-3660: This closes #2356.  Support schema containing a map with an array 
value in ConvertAvroToORC

Signed-off-by: joewitt <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/353fcdda
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/353fcdda
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/353fcdda

Branch: refs/heads/master
Commit: 353fcdda9cd9e4bc42e63a7ddad993ba75bab9b1
Parents: 49285d3
Author: Marco Gaido <[email protected]>
Authored: Thu Dec 21 16:28:48 2017 +0100
Committer: joewitt <[email protected]>
Committed: Tue Dec 26 17:46:35 2017 -0500

----------------------------------------------------------------------
 .../hadoop/hive/ql/io/orc/NiFiOrcUtils.java     | 18 ++--
 .../processors/hive/TestConvertAvroToORC.java   | 99 +++++++++++++++++++-
 .../apache/nifi/util/orc/TestNiFiOrcUtils.java  | 27 +++++-
 3 files changed, 126 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/353fcdda/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java
index b8c9dab..c9624b6 100644
--- 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java
@@ -40,15 +40,14 @@ import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
 
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -147,23 +146,20 @@ public class NiFiOrcUtils {
                 return o;
             }
             if (o instanceof Map) {
-                MapWritable mapWritable = new MapWritable();
+                Map map = new HashMap();
                 TypeInfo keyInfo = ((MapTypeInfo) 
typeInfo).getMapKeyTypeInfo();
-                TypeInfo valueInfo = ((MapTypeInfo) 
typeInfo).getMapKeyTypeInfo();
+                TypeInfo valueInfo = ((MapTypeInfo) 
typeInfo).getMapValueTypeInfo();
                 // Unions are not allowed as key/value types, so if we convert 
the key and value objects,
                 // they should return Writable objects
                 ((Map) o).forEach((key, value) -> {
                     Object keyObject = convertToORCObject(keyInfo, key);
                     Object valueObject = convertToORCObject(valueInfo, value);
-                    if (keyObject == null
-                            || !(keyObject instanceof Writable)
-                            || !(valueObject instanceof Writable)
-                            ) {
-                        throw new IllegalArgumentException("Maps may only 
contain Writable types, and the key cannot be null");
+                    if (keyObject == null) {
+                        throw new IllegalArgumentException("Maps' key cannot 
be null");
                     }
-                    mapWritable.put((Writable) keyObject, (Writable) 
valueObject);
+                    map.put(keyObject, valueObject);
                 });
-                return mapWritable;
+                return map;
             }
             if (o instanceof GenericData.Record) {
                 GenericData.Record record = (GenericData.Record) o;

http://git-wip-us.apache.org/repos/asf/nifi/blob/353fcdda/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestConvertAvroToORC.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestConvertAvroToORC.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestConvertAvroToORC.java
index 1c269fa..282b42d 100644
--- 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestConvertAvroToORC.java
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestConvertAvroToORC.java
@@ -202,8 +202,6 @@ public class TestConvertAvroToORC {
         assertTrue(intFieldObject instanceof IntWritable);
         assertEquals(10, ((IntWritable) intFieldObject).get());
 
-        // This is pretty awkward and messy. The map object is a Map (not a 
MapWritable) but the keys are writables (in this case Text)
-        // and so are the values (DoubleWritables in this case).
         Object mapFieldObject = inspector.getStructFieldData(o, 
inspector.getStructFieldRef("myMap"));
         assertTrue(mapFieldObject instanceof Map);
         Map map = (Map) mapFieldObject;
@@ -308,4 +306,101 @@ public class TestConvertAvroToORC {
         assertTrue(ageObject instanceof IntWritable);
         assertEquals(28, ((IntWritable) ageObject).get());
     }
+
+    @Test
+    public void test_onTrigger_nested_complex_record() throws Exception {
+
+        Map<String, List<Double>> mapData1 = new TreeMap<String, 
List<Double>>() {{
+            put("key1", Arrays.asList(1.0, 2.0));
+            put("key2", Arrays.asList(3.0, 4.0));
+        }};
+
+        Map<String, String> arrayMap11 = new TreeMap<String, String>() {{
+            put("key1", "v1");
+            put("key2", "v2");
+        }};
+        Map<String, String> arrayMap12 = new TreeMap<String, String>() {{
+            put("key3", "v3");
+            put("key4", "v4");
+        }};
+
+        GenericData.Record record = 
TestNiFiOrcUtils.buildNestedComplexAvroRecord(mapData1, 
Arrays.asList(arrayMap11, arrayMap12));
+
+        DatumWriter<GenericData.Record> writer = new 
GenericDatumWriter<>(record.getSchema());
+        DataFileWriter<GenericData.Record> fileWriter = new 
DataFileWriter<>(writer);
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        fileWriter.create(record.getSchema(), out);
+        fileWriter.append(record);
+
+        // Put another record in
+        Map<String, List<Double>> mapData2 = new TreeMap<String, 
List<Double>>() {{
+            put("key1", Arrays.asList(-1.0, -2.0));
+            put("key2", Arrays.asList(-3.0, -4.0));
+        }};
+
+        Map<String, String> arrayMap21 = new TreeMap<String, String>() {{
+            put("key1", "v-1");
+            put("key2", "v-2");
+        }};
+        Map<String, String> arrayMap22 = new TreeMap<String, String>() {{
+            put("key3", "v-3");
+            put("key4", "v-4");
+        }};
+
+        record = TestNiFiOrcUtils.buildNestedComplexAvroRecord(mapData2, 
Arrays.asList(arrayMap21, arrayMap22));
+        fileWriter.append(record);
+
+        fileWriter.flush();
+        fileWriter.close();
+        out.close();
+
+        Map<String, String> attributes = new HashMap<String, String>() {{
+            put(CoreAttributes.FILENAME.key(), "test");
+        }};
+        runner.enqueue(out.toByteArray(), attributes);
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ConvertAvroToORC.REL_SUCCESS, 1);
+
+        // Write the flow file out to disk, since the ORC Reader needs a path
+        MockFlowFile resultFlowFile = 
runner.getFlowFilesForRelationship(ConvertAvroToORC.REL_SUCCESS).get(0);
+        assertEquals("CREATE EXTERNAL TABLE IF NOT EXISTS 
nested_complex_record " +
+                "(myMapOfArray MAP<STRING, ARRAY<DOUBLE>>, myArrayOfMap 
ARRAY<MAP<STRING, STRING>>)"
+                + " STORED AS ORC", 
resultFlowFile.getAttribute(ConvertAvroToORC.HIVE_DDL_ATTRIBUTE));
+        assertEquals("2", 
resultFlowFile.getAttribute(ConvertAvroToORC.RECORD_COUNT_ATTRIBUTE));
+        assertEquals("test.orc", 
resultFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
+        byte[] resultContents = runner.getContentAsByteArray(resultFlowFile);
+        FileOutputStream fos = new FileOutputStream("target/test1.orc");
+        fos.write(resultContents);
+        fos.flush();
+        fos.close();
+
+        Configuration conf = new Configuration();
+        FileSystem fs = FileSystem.getLocal(conf);
+        Reader reader = OrcFile.createReader(new Path("target/test1.orc"), 
OrcFile.readerOptions(conf).filesystem(fs));
+        RecordReader rows = reader.rows();
+        Object o = rows.next(null);
+        assertNotNull(o);
+        assertTrue(o instanceof OrcStruct);
+        TypeInfo resultSchema = TestNiFiOrcUtils.buildNestedComplexOrcSchema();
+        StructObjectInspector inspector = (StructObjectInspector) 
OrcStruct.createObjectInspector(resultSchema);
+
+
+        // check values
+        Object myMapOfArray = inspector.getStructFieldData(o, 
inspector.getStructFieldRef("myMapOfArray"));
+        assertTrue(myMapOfArray instanceof Map);
+        Map map = (Map) myMapOfArray;
+        Object mapValue = map.get(new Text("key1"));
+        assertNotNull(mapValue);
+        assertTrue(mapValue instanceof List);
+        assertEquals(Arrays.asList(new DoubleWritable(1.0), new 
DoubleWritable(2.0)), mapValue);
+
+        Object myArrayOfMap = inspector.getStructFieldData(o, 
inspector.getStructFieldRef("myArrayOfMap"));
+        assertTrue(myArrayOfMap instanceof List);
+        List list = (List) myArrayOfMap;
+        Object el0 = list.get(0);
+        assertNotNull(el0);
+        assertTrue(el0 instanceof Map);
+        assertEquals(new Text("v1"), ((Map) el0).get(new Text("key1")));
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/353fcdda/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java
index 556723c..342aed9 100644
--- 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.Text;
 import org.junit.Test;
 
@@ -202,10 +201,9 @@ public class TestNiFiOrcUtils {
         map.put("Hello", 1.0f);
         map.put("World", 2.0f);
 
-        Object writable = 
NiFiOrcUtils.convertToORCObject(TypeInfoUtils.getTypeInfoFromTypeString("map<string,float>"),
 map);
-        assertTrue(writable instanceof MapWritable);
-        MapWritable mapWritable = (MapWritable) writable;
-        mapWritable.forEach((key, value) -> {
+        Object convMap = 
NiFiOrcUtils.convertToORCObject(TypeInfoUtils.getTypeInfoFromTypeString("map<string,float>"),
 map);
+        assertTrue(convMap instanceof Map);
+        ((Map) convMap).forEach((key, value) -> {
             assertTrue(key instanceof Text);
             assertTrue(value instanceof FloatWritable);
         });
@@ -338,6 +336,25 @@ public class TestNiFiOrcUtils {
         return 
TypeInfoUtils.getTypeInfoFromTypeString("struct<myInt:int,myMap:map<string,double>,myEnum:string,myLongOrFloat:uniontype<int>,myIntList:array<int>>");
     }
 
+    public static Schema buildNestedComplexAvroSchema() {
+        // Build a fake Avro record with nested complex types
+        final SchemaBuilder.FieldAssembler<Schema> builder = 
SchemaBuilder.record("nested.complex.record").namespace("any.data").fields();
+        
builder.name("myMapOfArray").type().map().values().array().items().doubleType().noDefault();
+        
builder.name("myArrayOfMap").type().array().items().map().values().stringType().noDefault();
+        return builder.endRecord();
+    }
+
+    public static GenericData.Record buildNestedComplexAvroRecord(Map<String, 
List<Double>> m, List<Map<String, String>> a) {
+        Schema schema = buildNestedComplexAvroSchema();
+        GenericData.Record row = new GenericData.Record(schema);
+        row.put("myMapOfArray", m);
+        row.put("myArrayOfMap", a);
+        return row;
+    }
+
+    public static TypeInfo buildNestedComplexOrcSchema() {
+        return 
TypeInfoUtils.getTypeInfoFromTypeString("struct<myMapOfArray:map<string,array<double>>,myArrayOfMap:array<map<string,string>>>");
+    }
 
     private static class TypeInfoCreator {
         static TypeInfo createInt() {

Reply via email to