Repository: nifi Updated Branches: refs/heads/master 49285d325 -> 353fcdda9
NIFI-3660: This closes #2356. Support schema containing a map with an array value in ConvertAvroToORC Signed-off-by: joewitt <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/353fcdda Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/353fcdda Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/353fcdda Branch: refs/heads/master Commit: 353fcdda9cd9e4bc42e63a7ddad993ba75bab9b1 Parents: 49285d3 Author: Marco Gaido <[email protected]> Authored: Thu Dec 21 16:28:48 2017 +0100 Committer: joewitt <[email protected]> Committed: Tue Dec 26 17:46:35 2017 -0500 ---------------------------------------------------------------------- .../hadoop/hive/ql/io/orc/NiFiOrcUtils.java | 18 ++-- .../processors/hive/TestConvertAvroToORC.java | 99 +++++++++++++++++++- .../apache/nifi/util/orc/TestNiFiOrcUtils.java | 27 +++++- 3 files changed, 126 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/353fcdda/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java index b8c9dab..c9624b6 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java @@ -40,15 +40,14 @@ import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -147,23 +146,20 @@ public class NiFiOrcUtils { return o; } if (o instanceof Map) { - MapWritable mapWritable = new MapWritable(); + Map map = new HashMap(); TypeInfo keyInfo = ((MapTypeInfo) typeInfo).getMapKeyTypeInfo(); - TypeInfo valueInfo = ((MapTypeInfo) typeInfo).getMapKeyTypeInfo(); + TypeInfo valueInfo = ((MapTypeInfo) typeInfo).getMapValueTypeInfo(); // Unions are not allowed as key/value types, so if we convert the key and value objects, // they should return Writable objects ((Map) o).forEach((key, value) -> { Object keyObject = convertToORCObject(keyInfo, key); Object valueObject = convertToORCObject(valueInfo, value); - if (keyObject == null - || !(keyObject instanceof Writable) - || !(valueObject instanceof Writable) - ) { - throw new IllegalArgumentException("Maps may only contain Writable types, and the key cannot be null"); + if (keyObject == null) { + throw new IllegalArgumentException("Maps' key cannot be null"); } - mapWritable.put((Writable) keyObject, (Writable) valueObject); + map.put(keyObject, valueObject); }); - return mapWritable; + return map; } if (o instanceof GenericData.Record) { GenericData.Record record = (GenericData.Record) o; http://git-wip-us.apache.org/repos/asf/nifi/blob/353fcdda/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestConvertAvroToORC.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestConvertAvroToORC.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestConvertAvroToORC.java index 1c269fa..282b42d 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestConvertAvroToORC.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestConvertAvroToORC.java @@ -202,8 +202,6 @@ public class TestConvertAvroToORC { assertTrue(intFieldObject instanceof IntWritable); assertEquals(10, ((IntWritable) intFieldObject).get()); - // This is pretty awkward and messy. The map object is a Map (not a MapWritable) but the keys are writables (in this case Text) - // and so are the values (DoubleWritables in this case). Object mapFieldObject = inspector.getStructFieldData(o, inspector.getStructFieldRef("myMap")); assertTrue(mapFieldObject instanceof Map); Map map = (Map) mapFieldObject; @@ -308,4 +306,101 @@ public class TestConvertAvroToORC { assertTrue(ageObject instanceof IntWritable); assertEquals(28, ((IntWritable) ageObject).get()); } + + @Test + public void test_onTrigger_nested_complex_record() throws Exception { + + Map<String, List<Double>> mapData1 = new TreeMap<String, List<Double>>() {{ + put("key1", Arrays.asList(1.0, 2.0)); + put("key2", Arrays.asList(3.0, 4.0)); + }}; + + Map<String, String> arrayMap11 = new TreeMap<String, String>() {{ + put("key1", "v1"); + put("key2", "v2"); + }}; + Map<String, String> arrayMap12 = new TreeMap<String, String>() {{ + put("key3", "v3"); + put("key4", "v4"); + }}; + + GenericData.Record record = TestNiFiOrcUtils.buildNestedComplexAvroRecord(mapData1, Arrays.asList(arrayMap11, arrayMap12)); + + DatumWriter<GenericData.Record> writer = new GenericDatumWriter<>(record.getSchema()); + DataFileWriter<GenericData.Record> fileWriter = new DataFileWriter<>(writer); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + fileWriter.create(record.getSchema(), out); + fileWriter.append(record); + + // Put another record in + Map<String, List<Double>> mapData2 = new TreeMap<String, List<Double>>() {{ + put("key1", Arrays.asList(-1.0, -2.0)); + put("key2", Arrays.asList(-3.0, -4.0)); + }}; + + Map<String, String> arrayMap21 = new TreeMap<String, String>() {{ + put("key1", "v-1"); + put("key2", "v-2"); + }}; + Map<String, String> arrayMap22 = new TreeMap<String, String>() {{ + put("key3", "v-3"); + put("key4", "v-4"); + }}; + + record = TestNiFiOrcUtils.buildNestedComplexAvroRecord(mapData2, Arrays.asList(arrayMap21, arrayMap22)); + fileWriter.append(record); + + fileWriter.flush(); + fileWriter.close(); + out.close(); + + Map<String, String> attributes = new HashMap<String, String>() {{ + put(CoreAttributes.FILENAME.key(), "test"); + }}; + runner.enqueue(out.toByteArray(), attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(ConvertAvroToORC.REL_SUCCESS, 1); + + // Write the flow file out to disk, since the ORC Reader needs a path + MockFlowFile resultFlowFile = runner.getFlowFilesForRelationship(ConvertAvroToORC.REL_SUCCESS).get(0); + assertEquals("CREATE EXTERNAL TABLE IF NOT EXISTS nested_complex_record " + + "(myMapOfArray MAP<STRING, ARRAY<DOUBLE>>, myArrayOfMap ARRAY<MAP<STRING, STRING>>)" + + " STORED AS ORC", resultFlowFile.getAttribute(ConvertAvroToORC.HIVE_DDL_ATTRIBUTE)); + assertEquals("2", resultFlowFile.getAttribute(ConvertAvroToORC.RECORD_COUNT_ATTRIBUTE)); + assertEquals("test.orc", resultFlowFile.getAttribute(CoreAttributes.FILENAME.key())); + byte[] resultContents = runner.getContentAsByteArray(resultFlowFile); + FileOutputStream fos = new FileOutputStream("target/test1.orc"); + fos.write(resultContents); + fos.flush(); + fos.close(); + + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.getLocal(conf); + Reader reader = OrcFile.createReader(new Path("target/test1.orc"), OrcFile.readerOptions(conf).filesystem(fs)); + RecordReader rows = reader.rows(); + Object o = rows.next(null); + assertNotNull(o); + assertTrue(o instanceof OrcStruct); + TypeInfo resultSchema = TestNiFiOrcUtils.buildNestedComplexOrcSchema(); + StructObjectInspector inspector = (StructObjectInspector) OrcStruct.createObjectInspector(resultSchema); + + + // check values + Object myMapOfArray = inspector.getStructFieldData(o, inspector.getStructFieldRef("myMapOfArray")); + assertTrue(myMapOfArray instanceof Map); + Map map = (Map) myMapOfArray; + Object mapValue = map.get(new Text("key1")); + assertNotNull(mapValue); + assertTrue(mapValue instanceof List); + assertEquals(Arrays.asList(new DoubleWritable(1.0), new DoubleWritable(2.0)), mapValue); + + Object myArrayOfMap = inspector.getStructFieldData(o, inspector.getStructFieldRef("myArrayOfMap")); + assertTrue(myArrayOfMap instanceof List); + List list = (List) myArrayOfMap; + Object el0 = list.get(0); + assertNotNull(el0); + assertTrue(el0 instanceof Map); + assertEquals(new Text("v1"), ((Map) el0).get(new Text("key1"))); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/353fcdda/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java index 556723c..342aed9 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java @@ -28,7 +28,6 @@ import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; import org.junit.Test; @@ -202,10 +201,9 @@ public class TestNiFiOrcUtils { map.put("Hello", 1.0f); map.put("World", 2.0f); - Object writable = NiFiOrcUtils.convertToORCObject(TypeInfoUtils.getTypeInfoFromTypeString("map<string,float>"), map); - assertTrue(writable instanceof MapWritable); - MapWritable mapWritable = (MapWritable) writable; - mapWritable.forEach((key, value) -> { + Object convMap = NiFiOrcUtils.convertToORCObject(TypeInfoUtils.getTypeInfoFromTypeString("map<string,float>"), map); + assertTrue(convMap instanceof Map); + ((Map) convMap).forEach((key, value) -> { assertTrue(key instanceof Text); assertTrue(value instanceof FloatWritable); }); @@ -338,6 +336,25 @@ public class TestNiFiOrcUtils { return TypeInfoUtils.getTypeInfoFromTypeString("struct<myInt:int,myMap:map<string,double>,myEnum:string,myLongOrFloat:uniontype<int>,myIntList:array<int>>"); } + public static Schema buildNestedComplexAvroSchema() { + // Build a fake Avro record with nested complex types + final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("nested.complex.record").namespace("any.data").fields(); + builder.name("myMapOfArray").type().map().values().array().items().doubleType().noDefault(); + builder.name("myArrayOfMap").type().array().items().map().values().stringType().noDefault(); + return builder.endRecord(); + } + + public static GenericData.Record buildNestedComplexAvroRecord(Map<String, List<Double>> m, List<Map<String, String>> a) { + Schema schema = buildNestedComplexAvroSchema(); + GenericData.Record row = new GenericData.Record(schema); + row.put("myMapOfArray", m); + row.put("myArrayOfMap", a); + return row; + } + + public static TypeInfo buildNestedComplexOrcSchema() { + return TypeInfoUtils.getTypeInfoFromTypeString("struct<myMapOfArray:map<string,array<double>>,myArrayOfMap:array<map<string,string>>>"); + } private static class TypeInfoCreator { static TypeInfo createInt() {
