Repository: nifi
Updated Branches:
  refs/heads/master d1ab17580 -> 9ee2316ff


NIFI-5491: Fixed PutHive3Streaming handling of Byte, Short, and Struct

This closes #2938.

Signed-off-by: Mark Payne <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/9ee2316f
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/9ee2316f
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/9ee2316f

Branch: refs/heads/master
Commit: 9ee2316ff676acc413f28f86e8d9924947cb63ea
Parents: d1ab175
Author: Matthew Burgess <[email protected]>
Authored: Mon Aug 6 13:34:39 2018 -0400
Committer: Mark Payne <[email protected]>
Committed: Mon Aug 6 14:33:49 2018 -0400

----------------------------------------------------------------------
 .../apache/hive/streaming/NiFiRecordSerDe.java  | 57 +++++++++++---
 .../processors/hive/TestPutHive3Streaming.java  | 82 ++++++++++++++++++++
 .../src/test/resources/datatype_test.avsc       | 39 ++++++++++
 3 files changed, 167 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/9ee2316f/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java
index d4b444a..0f15096 100644
--- 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java
@@ -158,7 +158,7 @@ public class NiFiRecordSerDe extends AbstractSerDe {
             stats.setRowCount(stats.getRowCount() + 1);
 
         } catch (Exception e) {
-            log.warn("Error [{}] parsing Record [{}].", new 
Object[]{e.getLocalizedMessage(), t}, e);
+            log.warn("Error [{}] parsing Record [{}].", new 
Object[]{e.toString(), t}, e);
             throw new SerDeException(e);
         }
 
@@ -166,14 +166,15 @@ public class NiFiRecordSerDe extends AbstractSerDe {
     }
 
     /**
-     * Utility method to extract current expected field from given JsonParser
-     * isTokenCurrent is a boolean variable also passed in, which determines
-     * if the JsonParser is already at the token we expect to read next, or
-     * needs advancing to the next before we read.
+     * Utility method to extract current expected field from given record.
      */
-    private Object extractCurrentField(Record record, RecordField field, 
TypeInfo fieldTypeInfo) {
+    @SuppressWarnings("unchecked")
+    private Object extractCurrentField(Record record, RecordField field, 
TypeInfo fieldTypeInfo) throws SerDeException {
         Object val;
-        String fieldName = (field != null) ? field.getFieldName() : null;
+        if (field == null) {
+            return null;
+        }
+        String fieldName = field.getFieldName();
 
         switch (fieldTypeInfo.getCategory()) {
             case PRIMITIVE:
@@ -182,9 +183,15 @@ public class NiFiRecordSerDe extends AbstractSerDe {
                     primitiveCategory = ((PrimitiveTypeInfo) 
fieldTypeInfo).getPrimitiveCategory();
                 }
                 switch (primitiveCategory) {
-                    case INT:
                     case BYTE:
+                        Integer bIntValue = record.getAsInt(fieldName);
+                        val = bIntValue == null ? null : bIntValue.byteValue();
+                        break;
                     case SHORT:
+                        Integer sIntValue = record.getAsInt(fieldName);
+                        val = sIntValue == null ? null : 
sIntValue.shortValue();
+                        break;
+                    case INT:
                         val = record.getAsInt(fieldName);
                         break;
                     case LONG:
@@ -205,7 +212,11 @@ public class NiFiRecordSerDe extends AbstractSerDe {
                         val = record.getAsString(fieldName);
                         break;
                     case BINARY:
-                        val = 
AvroTypeUtil.convertByteArray(record.getAsArray(fieldName)).array();
+                        Object[] array = record.getAsArray(fieldName);
+                        if (array == null) {
+                            return null;
+                        }
+                        val = AvroTypeUtil.convertByteArray(array).array();
                         break;
                     case DATE:
                         val = record.getAsDate(fieldName, 
field.getDataType().getFormat());
@@ -227,8 +238,32 @@ public class NiFiRecordSerDe extends AbstractSerDe {
                 val = 
DataTypeUtils.convertRecordFieldtoObject(record.getValue(fieldName), 
field.getDataType());
                 break;
             case STRUCT:
-                val = 
DataTypeUtils.convertRecordFieldtoObject(record.getValue(fieldName), 
field.getDataType());
-                break;
+                // The Hive StandardStructObjectInspector expects the object 
corresponding to a "struct" to be an array or List rather than a Map.
+                // Do the conversion here, calling extractCurrentField 
recursively to traverse any nested structs.
+                Record nestedRecord = (Record) record.getValue(fieldName);
+                if (nestedRecord == null) {
+                    return null;
+                }
+                try {
+                    RecordSchema recordSchema = nestedRecord.getSchema();
+                    List<RecordField> recordFields = recordSchema.getFields();
+                    if (recordFields == null || recordFields.isEmpty()) {
+                        return Collections.emptyList();
+                    }
+                    // This List will hold the values of the entries in the Map
+                    List<Object> structList = new 
ArrayList<>(recordFields.size());
+                    StructTypeInfo typeInfo = (StructTypeInfo) 
schema.getStructFieldTypeInfo(fieldName);
+                    for (RecordField nestedRecordField : recordFields) {
+                        String fName = nestedRecordField.getFieldName();
+                        String normalizedFieldName = fName.toLowerCase();
+                        structList.add(extractCurrentField(nestedRecord, 
nestedRecordField, typeInfo.getStructFieldTypeInfo(normalizedFieldName)));
+                    }
+                    return structList;
+                } catch (Exception e) {
+                    log.warn("Error [{}] parsing Record [{}].", new 
Object[]{e.toString(), nestedRecord}, e);
+                    throw new SerDeException(e);
+                }
+                // break unreachable
             default:
                 log.error("Unknown type found: " + fieldTypeInfo + "for field 
of type: " + field.getDataType().toString());
                 return null;

http://git-wip-us.apache.org/repos/asf/nifi/blob/9ee2316f/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java
index cfc6017..da463e6 100644
--- 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java
@@ -57,7 +57,9 @@ import org.apache.nifi.kerberos.KerberosCredentialsService;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.record.MapRecord;
 import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.util.MockFlowFile;
@@ -82,6 +84,8 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
 import java.util.function.BiFunction;
 
 import static 
org.apache.nifi.processors.hive.AbstractHive3QLProcessor.ATTR_OUTPUT_TABLES;
@@ -649,6 +653,84 @@ public class TestPutHive3Streaming {
     }
 
     @Test
+    public void testDataTypeConversions() throws Exception {
+        final String avroSchema = IOUtils.toString(new 
FileInputStream("src/test/resources/datatype_test.avsc"), 
StandardCharsets.UTF_8);
+        schema = new Schema.Parser().parse(avroSchema);
+
+        processor.setFields(Arrays.asList(
+                new FieldSchema("uuid", serdeConstants.STRING_TYPE_NAME, 
"uuid"),
+                new FieldSchema("stringc", serdeConstants.STRING_TYPE_NAME, 
"stringc"),
+                new FieldSchema("intc", serdeConstants.INT_TYPE_NAME, "intc"),
+                new FieldSchema("tinyintc", serdeConstants.TINYINT_TYPE_NAME, 
"tinyintc"),
+                new FieldSchema("smallintc", 
serdeConstants.SMALLINT_TYPE_NAME, "smallintc"),
+                new FieldSchema("bigintc", serdeConstants.BIGINT_TYPE_NAME, 
"bigintc"),
+                new FieldSchema("booleanc", serdeConstants.BOOLEAN_TYPE_NAME, 
"booleanc"),
+                new FieldSchema("floatc", serdeConstants.FLOAT_TYPE_NAME, 
"floatc"),
+                new FieldSchema("doublec", serdeConstants.DOUBLE_TYPE_NAME, 
"doublec"),
+                new FieldSchema("listc", serdeConstants.LIST_TYPE_NAME + "<" + 
serdeConstants.STRING_TYPE_NAME + ">", "listc"),
+                new FieldSchema("structc", serdeConstants.STRUCT_TYPE_NAME
+                        + "<sint:" + serdeConstants.INT_TYPE_NAME + ","
+                        + "sboolean:" + serdeConstants.BOOLEAN_TYPE_NAME + ","
+                        + "sstring:" + serdeConstants.STRING_TYPE_NAME + ">", 
"structc"),
+                new FieldSchema("enumc", serdeConstants.STRING_TYPE_NAME, 
"enumc")));
+
+        runner = TestRunners.newTestRunner(processor);
+        runner.setProperty(PutHive3Streaming.HIVE_CONFIGURATION_RESOURCES, 
TEST_CONF_PATH);
+        MockRecordParser readerFactory = new MockRecordParser();
+        final RecordSchema recordSchema = AvroTypeUtil.createSchema(schema);
+        for (final RecordField recordField : recordSchema.getFields()) {
+            readerFactory.addSchemaField(recordField.getFieldName(), 
recordField.getDataType().getFieldType(), recordField.isNullable());
+        }
+
+        List<String> enumc = Arrays.asList("SPADES", "HEARTS", "DIAMONDS", 
"CLUBS");
+        Random r = new Random();
+        for (int index = 0; index < 10; index++) {
+            final int i = index;
+            Record mapRecord = new 
MapRecord(AvroTypeUtil.createSchema(schema.getField("structc").schema().getTypes().get(1)),
 // Get non-null type in union
+                    new HashMap<String, Object>() {
+                {
+                    put("sint", i + 2); // {"name": "sint", "type": "int"},
+                    if (i % 3 == 2) {
+                        put("sboolean", null);
+                    } else {
+                        put("sboolean", i % 3 == 1); // {"name": "sboolean", 
"type": ["null","boolean"]},
+                    }
+                    put("sstring", "world"); // {"name": "sstring", "type": 
"string"}
+                }
+            });
+            readerFactory.addRecord(
+                    UUID.randomUUID(), // {"name": "uuid", "type": "string"},
+                    "hello", // {"name": "stringc", "type": "string"},
+                    i, // {"name": "intc", "type": "int"},
+                    i + 1, // {"name": "tinyintc", "type": ["null", "int"]},
+                    i * 10, // {"name": "smallintc", "type": "int"},
+                    i * Integer.MAX_VALUE, // {"name": "bigintc", "type": 
"long"},
+                    i % 2 == 0, // {"name": "booleanc", "type": "boolean"},
+                    i * 100.0f, // {"name": "floatc", "type": "floatc"},
+                    i * 100.0, // {"name": "doublec", "type": "double"},
+                    new String[]{"a", "b"}, // {"name": "listc", "type": 
["null", {"type": "array", "items": "string"}]},
+                    mapRecord,
+                    enumc.get(r.nextInt(4)) // {"name": "enumc", "type": 
{"type": "enum", "name": "Suit", "symbols": 
["SPADES","HEARTS","DIAMONDS","CLUBS"]}}
+            );
+        }
+
+        runner.addControllerService("mock-reader-factory", readerFactory);
+        runner.enableControllerService(readerFactory);
+        runner.setProperty(PutHive3Streaming.RECORD_READER, 
"mock-reader-factory");
+
+        runner.setProperty(PutHive3Streaming.METASTORE_URI, 
"thrift://localhost:9083");
+        runner.setProperty(PutHive3Streaming.DB_NAME, "default");
+        runner.setProperty(PutHive3Streaming.TABLE_NAME, "users");
+        runner.enqueue(new byte[0]);
+        runner.run();
+
+        runner.assertTransferCount(PutHive3Streaming.REL_SUCCESS, 1);
+        final MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(PutHive3Streaming.REL_SUCCESS).get(0);
+        assertEquals("10", 
flowFile.getAttribute(HIVE_STREAMING_RECORD_COUNT_ATTR));
+        assertEquals("default.users", 
flowFile.getAttribute(ATTR_OUTPUT_TABLES));
+    }
+
+    @Test
     public void cleanup() {
         processor.cleanup();
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/9ee2316f/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/resources/datatype_test.avsc
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/resources/datatype_test.avsc
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/resources/datatype_test.avsc
new file mode 100644
index 0000000..a232ad0
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/resources/datatype_test.avsc
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ {
+ "namespace": "nifi",
+ "name": "test",
+ "type": "record",
+ "fields": [
+  {"name": "uuid", "type": "string"},
+  {"name": "stringc", "type": "string"},
+  {"name": "intc", "type": "int"},
+  {"name": "tinyintc", "type": ["null", "int"]},
+  {"name": "smallintc", "type": "int"},
+  {"name": "bigintc", "type": "long"},
+  {"name": "booleanc", "type": "boolean"},
+  {"name": "floatc", "type": "float"},
+  {"name": "doublec", "type": "double"},
+  {"name": "listc", "type": ["null", {"type": "array", "items": "string"}]},
+  {"name": "structc", "type": ["null", {"name": "structcRecord", "type": 
"record", "fields": [
+       {"name": "sint", "type": "int"},
+       {"name": "sboolean", "type": ["null","boolean"]},
+    {"name": "sstring", "type": "string"}
+   ]}]},
+  {"name": "enumc", "type": {"type": "enum", "name": "Suit", "symbols": 
["SPADES","HEARTS","DIAMONDS","CLUBS"]}}
+ ]
+}
\ No newline at end of file

Reply via email to