tpalfy commented on a change in pull request #3684: NIFI-6295: Refactored NiFiRecordSerDe to handle nested complex types URL: https://github.com/apache/nifi/pull/3684#discussion_r323125610
########## File path: nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/hive/streaming/TestNiFiRecordSerDe.java ########## @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.streaming; + +import org.apache.hadoop.hive.common.type.Date; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.common.type.Timestamp; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.io.ObjectWritable; +import org.apache.nifi.avro.AvroTypeUtil; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.util.MockComponentLog; +import org.junit.Test; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +public class TestNiFiRecordSerDe { Review comment: I think in general this test could benefit from some changes: 1. Use more declarative assertions by defining an "expected" datastructure 2. Slim the the curly bracketed structures. Usually it's better for the observer if a new line starts a significantly new concept. I played around with this, here's what I came up with, please use of it as much as you'd like: ```java /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.hive.streaming; import org.apache.hadoop.hive.common.type.Date; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.common.type.Timestamp; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.io.ObjectWritable; import org.apache.nifi.avro.AvroTypeUtil; import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.record.DataType; import org.apache.nifi.serialization.record.MapRecord; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.util.MockComponentLog; import org.junit.Test; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; public class TestNiFiRecordSerDe { @Test public void testSimpleFields() throws SerDeException { NiFiRecordSerDe serDe = createSerDe( "bytec,shortc,intc,longc,boolc,floatc,doublec,stringc,varcharc,charc,datec,timestampc,decimalc", "tinyint:smallint:int:bigint:boolean:float:double:string:varchar(50):char(1):date:timestamp:decimal" ); RecordSchema schema = new SimpleRecordSchema( Arrays.asList( new RecordField("bytec", RecordFieldType.BYTE.getDataType()), new RecordField("shortc", RecordFieldType.SHORT.getDataType()), new RecordField("intc", RecordFieldType.INT.getDataType()), new RecordField("longc", RecordFieldType.LONG.getDataType()), new RecordField("boolc", RecordFieldType.BOOLEAN.getDataType()), new RecordField("floatc", RecordFieldType.FLOAT.getDataType()), new RecordField("doublec", RecordFieldType.DOUBLE.getDataType()), new RecordField("stringc", RecordFieldType.STRING.getDataType()), new RecordField("varcharc", RecordFieldType.STRING.getDataType()), new RecordField("charc", RecordFieldType.CHAR.getDataType()), new RecordField("datec", RecordFieldType.DATE.getDataType("yyyy-MM-dd")), new RecordField("timestampc", RecordFieldType.TIMESTAMP.getDataType("yyyy-MM-dd HH:mm:ss")), new RecordField("decimalc", RecordFieldType.DOUBLE.getDataType()) ) ); long currentTimeMillis = System.currentTimeMillis(); HashMap<String, Object> input = new HashMap<>() {{ put("bytec", Byte.valueOf((byte) 2)); put("shortc", Short.valueOf((short) 45)); put("intc", Integer.valueOf(95)); put("longc", Long.valueOf(876L)); put("boolc", Boolean.TRUE); put("floatc", 4.56f); put("doublec", 2.3445); put("stringc", "test"); put("varcharc", "test2"); put("charc", 'c'); put("datec", new java.sql.Date(currentTimeMillis)); put("timestampc", new java.sql.Timestamp(currentTimeMillis)); put("decimalc", 0.45); }}; Date date = new Date(); date.setTimeInMillis(currentTimeMillis); Timestamp ts = new Timestamp(); ts.setTimeInMillis(currentTimeMillis); List<Object> expected = Arrays.asList( Byte.valueOf("2"), Short.valueOf("45"), 95, 876L, Boolean.TRUE, 4.56f, 2.3445, "test", "test2", "c", date, ts, HiveDecimal.create("0.45") ); Object deserialized = serDe.deserialize(new ObjectWritable(new MapRecord(schema, input))); assertEquals(expected, deserialized); } @Test public void testArrays() throws SerDeException { NiFiRecordSerDe serDe = createSerDe( "binaryc,binaryc2", "binary:binary" ); RecordSchema schema = new SimpleRecordSchema( Arrays.asList( new RecordField("binaryc", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType())), new RecordField("binaryc2", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType())) ) ); HashMap<String, Object> input = new HashMap<>() {{ put("binaryc", new byte[]{1, 2}); put("binaryc2", "Hello"); }}; Object[] expected = new Object[]{ new byte[]{1, 2}, "Hello".getBytes(StandardCharsets.UTF_8) }; Object deserialized = serDe.deserialize(new ObjectWritable(new MapRecord(schema, input))); assertArrayEquals(expected, ((List)deserialized).toArray()); } @Test public void testStructField() throws SerDeException{ NiFiRecordSerDe serDe = createSerDe("structc", "struct<age:int,name:string>" ); RecordSchema innerSchema = new SimpleRecordSchema(Arrays.asList( new RecordField("age", RecordFieldType.INT.getDataType()), new RecordField("name", RecordFieldType.STRING.getDataType()) )); RecordSchema schema = new SimpleRecordSchema(Arrays.asList( new RecordField("structc", RecordFieldType.RECORD.getRecordDataType(innerSchema)) )); HashMap<String, Object> value = new HashMap<>() {{ put("structc", new MapRecord(innerSchema, new HashMap<>() {{ put("age", 15); put("name", "gideon"); }})); }}; List<Object> expected = Arrays.asList( Arrays.asList(15, "gideon") ); Object deserialized = serDe.deserialize(new ObjectWritable(new MapRecord(schema, value))); assertEquals(expected, deserialized); } @Test public void testSimpleArray() throws SerDeException{ long now = System.currentTimeMillis(); Date hiveDate = new Date(); hiveDate.setTimeInMillis(now); Timestamp hiveTs = new Timestamp(); hiveTs.setTimeInMillis(now); testSimpleArray("tinyint", RecordFieldType.BYTE.getDataType(), new Byte[] { 5, 29 }, new Byte[] { 5, 29 }); testSimpleArray("smallint", RecordFieldType.SHORT.getDataType(), new Short[] { 5, 29 }, new Short[] { 5, 29 }); testSimpleArray("int", RecordFieldType.INT.getDataType(), new Object[] { 1, 2, 3 ,4, 5 }, new Object[] { 1, 2, 3, 4, 5 }); testSimpleArray("bigint", RecordFieldType.LONG.getDataType(), new Object[] { 298767L, 89876L }, new Object[] { 298767L, 89876L }); testSimpleArray("boolean", RecordFieldType.BOOLEAN.getDataType(), new Object[] { true, false }, new Object[] { true, false }); testSimpleArray("float", RecordFieldType.FLOAT.getDataType(), new Object[] { 1.23f, 3.14f }, new Object[] { 1.23f, 3.14f }); testSimpleArray("double", RecordFieldType.DOUBLE.getDataType(), new Object[] { 1.235, 3.142, 1.0 }, new Object[] { 1.235, 3.142, 1.0 }); testSimpleArray("string", RecordFieldType.STRING.getDataType(), new Object[] { "sasa", "wewe" }, new Object[] { "sasa", "wewe" }); testSimpleArray("varchar(20)", RecordFieldType.STRING.getDataType(), new Object[] { "niko", "fiti", "sema"}, new Object[] { "niko", "fiti", "sema" }); testSimpleArray("char(1)", RecordFieldType.CHAR.getDataType(), new Object[] { 'a', 'b', 'c' }, new Object[] { "a", "b", "c"}); testSimpleArray("date", RecordFieldType.DATE.getDataType(), new Object[] { new java.sql.Date(now)}, new Object[] { hiveDate }); testSimpleArray("timestamp", RecordFieldType.TIMESTAMP.getDataType(), new Object[] { new java.sql.Timestamp(now)}, new Object[] { hiveTs }); testSimpleArray("decimal(10,2)", RecordFieldType.DOUBLE.getDataType(), new Object[] { 3.45, 1.25 }, new Object[] { HiveDecimal.create(3.45), HiveDecimal.create(1.25)}); } public void testSimpleArray(String typeName, DataType elementDataType, Object[] values, Object[] expected) throws SerDeException { NiFiRecordSerDe serDe = createSerDe("listc", "array<" + typeName + ">" ); RecordSchema schema = new SimpleRecordSchema(Arrays.asList( new RecordField("listc", RecordFieldType.ARRAY.getArrayDataType(elementDataType)) )); Object deserialized = serDe.deserialize(new ObjectWritable(new MapRecord(schema, new HashMap<>() {{ put("listc", values); }}))); List<Object> fields = (List<Object>)deserialized; assertEquals(1, fields.size()); List<Object> nested = (List<Object>) fields.get(0); for(int i=0; i<expected.length; i++){ assertEquals(expected[i], nested.get(i)); } } @Test public void testStructArray() throws SerDeException{ NiFiRecordSerDe serDe = createSerDe("listc", "array<struct<age:int,name:string>>" ); RecordSchema innerSchema = new SimpleRecordSchema(Arrays.asList( new RecordField("age", RecordFieldType.INT.getDataType()), new RecordField("name", RecordFieldType.STRING.getDataType()) )); RecordSchema schema = new SimpleRecordSchema(Arrays.asList( new RecordField("listc", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(innerSchema))) )); HashMap<String, Object> input = new HashMap<>() {{ put("listc", new Record[]{new MapRecord(innerSchema, new HashMap<>() {{ put("age", 15); put("name", "gideon"); }}), new MapRecord(innerSchema, new HashMap<>() {{ put("age", 87); put("name", "cucu"); }}) }); }}; Object expected = Arrays.asList( Arrays.asList( Arrays.asList(15, "gideon"), Arrays.asList(87, "cucu") ) ); Object deserialized = serDe.deserialize(new ObjectWritable(new MapRecord(schema, input))); assertEquals(expected, deserialized); } @Test public void testSimpleMap() throws SerDeException{ testSimpleMap("string", "tinyint", RecordFieldType.BYTE.getDataType(), createMap((byte)89, (byte)2), objectMap(createMap((byte)89, (byte)2))); testSimpleMap("string", "smallint", RecordFieldType.SHORT.getDataType(), createMap((short)89, (short)209), objectMap(createMap((short)89, (short)209))); testSimpleMap("string", "int", RecordFieldType.INT.getDataType(), createMap(90, 87), objectMap(createMap(90, 87))); testSimpleMap("string", "bigint", RecordFieldType.BIGINT.getDataType(), createMap(87888L, 876L, 123L), objectMap(createMap(87888L, 876L, 123L))); testSimpleMap("string", "boolean", RecordFieldType.BOOLEAN.getDataType(), createMap(false, true, true, false), objectMap(createMap(false, true, true, false))); testSimpleMap("string", "float", RecordFieldType.FLOAT.getDataType(), createMap(1.2f, 8.6f, 0.125f), objectMap(createMap(1.2f, 8.6f, 0.125f))); testSimpleMap("string", "double", RecordFieldType.DOUBLE.getDataType(), createMap(3.142, 8.93), objectMap(createMap(3.142, 8.93))); testSimpleMap("string", "string", RecordFieldType.STRING.getDataType(), createMap("form", "ni", "aje"), objectMap(createMap("form", "ni", "aje"))); testSimpleMap("string", "varchar(20)", RecordFieldType.STRING.getDataType(), createMap("niko", "kiza"), objectMap(createMap("niko", "kiza"))); testSimpleMap("string", "char(1)", RecordFieldType.CHAR.getDataType(), createMap('a', 'b', 'c'), objectMap(createMap("a", "b", "c"))); long now = System.currentTimeMillis(); Date hiveDate = new Date(); hiveDate.setTimeInMillis(now); Timestamp hiveTs = new Timestamp(); hiveTs.setTimeInMillis(now); testSimpleMap("string", "date", RecordFieldType.DATE.getDataType(), createMap(new java.sql.Date(now)), objectMap(createMap(hiveDate))); testSimpleMap("string", "timestamp", RecordFieldType.TIMESTAMP.getDataType(), createMap(new java.sql.Timestamp(now)), objectMap(createMap(hiveTs))); testSimpleMap("string", "decimal(10,2)", RecordFieldType.DOUBLE.getDataType(), createMap(45.6, 2345.5), objectMap(createMap( HiveDecimal.create(45.6), HiveDecimal.create(2345.5) ))); } private Map<String,Object> createMap(Object... keyValues){ Map<String,Object> map = new HashMap<>(keyValues.length); for(int i=0; i<keyValues.length; i++){ map.put("key." + i, keyValues[i]); } return map; } private Map<Object,Object> objectMap(Map<String,Object> input){ HashMap<Object,Object> map = new HashMap<>(); map.putAll(input); return map; } private void testSimpleMap(String keyType, String valueType, DataType fieldType, Map<String, Object> fields, Map<Object, Object> expected) throws SerDeException{ NiFiRecordSerDe serDe = createSerDe("mapc", "map<" + keyType + "," + valueType + ">" ); RecordSchema schema = new SimpleRecordSchema(Arrays.asList( new RecordField("mapc", RecordFieldType.MAP.getMapDataType(fieldType)) )); Object deserialized = serDe.deserialize(new ObjectWritable(new MapRecord(schema, new HashMap<String, Object>(){ { put("mapc", fields); } }))); List<Object> desFields = (List<Object>)deserialized; assertEquals(1, desFields.size()); Map<Object,Object> map = (Map<Object, Object>)desFields.get(0); for(Map.Entry<Object, Object> entry: expected.entrySet()){ assertEquals(entry.getValue(), map.get(entry.getKey())); } } @Test public void testStructMap() throws SerDeException{ NiFiRecordSerDe serDe = createSerDe( "mapc", "map<string,struct<id:int,balance:decimal(18,2)>>" ); RecordSchema recordSchema = new SimpleRecordSchema(Arrays.asList( new RecordField("id", RecordFieldType.INT.getDataType()), new RecordField("balance", RecordFieldType.DOUBLE.getDataType()) )); RecordSchema schema = new SimpleRecordSchema(Arrays.asList( new RecordField("mapc", RecordFieldType.MAP.getMapDataType(RecordFieldType.RECORD.getRecordDataType(recordSchema))) )); HashMap<String, Object> input = new HashMap<>() {{ put("mapc", new HashMap<String, Object>() {{ put("current", new MapRecord(recordSchema, new HashMap<>() {{ put("id", 1); put("balance", 56.9); }})); put("savings", new MapRecord(recordSchema, new HashMap<>() {{ put("id", 2); put("balance", 104.65); }})); }}); }}; Object expected = Arrays.asList( new HashMap<>(){{ put("current", Arrays.asList(1, HiveDecimal.create(56.9))); put("savings", Arrays.asList(2, HiveDecimal.create(104.65))); }} ); Object deserialized = serDe.deserialize(new ObjectWritable(new MapRecord(schema, input))); assertEquals(expected, deserialized); } NiFiRecordSerDe createSerDe(String columnNames, String typeInfo) throws SerDeException{ Properties props = new Properties(); props.setProperty(serdeConstants.LIST_COLUMNS, columnNames); props.setProperty(serdeConstants.LIST_COLUMN_TYPES, typeInfo); NiFiRecordSerDe serDe = new NiFiRecordSerDe(null, new MockComponentLog("logger", new Object())); //reader isn't used serDe.initialize(null, props); //conf isn't used return serDe; } } ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services