[
https://issues.apache.org/jira/browse/NIFI-4963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16502273#comment-16502273
]
ASF GitHub Bot commented on NIFI-4963:
--------------------------------------
Github user prasanthj commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2755#discussion_r193173275
--- Diff:
nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java
---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.streaming;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeStats;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import
org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import
org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hive.common.util.HiveStringUtils;
+import org.apache.hive.common.util.TimestampParser;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class NiFiRecordSerDe extends AbstractSerDe {
+
+ protected RecordReader recordReader;
+ protected ComponentLog log;
+ protected List<String> columnNames;
+ protected StructTypeInfo schema;
+
+ protected StandardStructObjectInspector cachedObjectInspector;
+ protected TimestampParser tsParser;
+
+ private final static Pattern INTERNAL_PATTERN =
Pattern.compile("_col([0-9]+)");
+
+ public NiFiRecordSerDe(RecordReader recordReader, ComponentLog log) {
+ this.recordReader = recordReader;
+ this.log = log;
+ }
+
+ @Override
+ public void initialize(Configuration conf, Properties tbl) {
+ List<TypeInfo> columnTypes;
+ StructTypeInfo rowTypeInfo;
+
+ log.debug("Initializing NiFiRecordSerDe: {}",
tbl.entrySet().toArray());
+
+ // Get column names and types
+ String columnNameProperty =
tbl.getProperty(serdeConstants.LIST_COLUMNS);
+ String columnTypeProperty =
tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);
+ final String columnNameDelimiter =
tbl.containsKey(serdeConstants.COLUMN_NAME_DELIMITER) ? tbl
+ .getProperty(serdeConstants.COLUMN_NAME_DELIMITER) :
String.valueOf(SerDeUtils.COMMA);
+ // all table column names
+ if (columnNameProperty.isEmpty()) {
+ columnNames = new ArrayList<>(0);
+ } else {
+ columnNames = new
ArrayList<>(Arrays.asList(columnNameProperty.split(columnNameDelimiter)));
+ }
+
+ // all column types
+ if (columnTypeProperty.isEmpty()) {
+ columnTypes = new ArrayList<>(0);
+ } else {
+ columnTypes =
TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
+ }
+
+ log.debug("columns: {}, {}", new Object[]{columnNameProperty,
columnNames});
+ log.debug("types: {}, {} ", new Object[]{columnTypeProperty,
columnTypes});
+
+ assert (columnNames.size() == columnTypes.size());
+
+ rowTypeInfo = (StructTypeInfo)
TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes);
+ schema = rowTypeInfo;
+ log.debug("schema : {}", new Object[]{schema});
+ cachedObjectInspector = (StandardStructObjectInspector)
TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(rowTypeInfo);
+ tsParser = new
TimestampParser(HiveStringUtils.splitAndUnEscape(tbl.getProperty(serdeConstants.TIMESTAMP_FORMATS)));
+ }
+
+ @Override
+ public Class<? extends Writable> getSerializedClass() {
+ return null;
+ }
+
+ @Override
+ public Writable serialize(Object o, ObjectInspector objectInspector)
throws SerDeException {
+ return null;
+ }
+
+ @Override
+ public SerDeStats getSerDeStats() {
+ return null;
+ }
+
+ @Override
+ public Object deserialize(Writable writable) throws SerDeException {
+ ObjectWritable t = (ObjectWritable) writable;
+ Record record = (Record) t.get();
+ List<Object> r = new
ArrayList<>(Collections.nCopies(columnNames.size(), null));
+ try {
+ RecordSchema recordSchema = record.getSchema();
+ for (RecordField field : recordSchema.getFields()) {
+ String fieldName = field.getFieldName();
+ int fpos =
schema.getAllStructFieldNames().indexOf(fieldName.toLowerCase());
--- End diff --
Once the processor is started can the record schema change for different
records?
If not can this can field name to position mapping be moved to init?
> Add support for Hive 3.0 processors
> -----------------------------------
>
> Key: NIFI-4963
> URL: https://issues.apache.org/jira/browse/NIFI-4963
> Project: Apache NiFi
> Issue Type: New Feature
> Components: Extensions
> Reporter: Matt Burgess
> Assignee: Matt Burgess
> Priority: Major
>
> Apache Hive is working on Hive 3.0, this Jira is to add a bundle of
> components (much like the current Hive bundle) that supports Hive 3.0 (and
> Apache ORC if necessary).
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)