[ https://issues.apache.org/jira/browse/NIFI-4963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16502318#comment-16502318 ]
ASF GitHub Bot commented on NIFI-4963: -------------------------------------- Github user prasanthj commented on a diff in the pull request: https://github.com/apache/nifi/pull/2755#discussion_r193184650 --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.streaming; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.ObjectWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hive.common.util.HiveStringUtils; +import org.apache.hive.common.util.TimestampParser; +import org.apache.nifi.avro.AvroTypeUtil; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.util.DataTypeUtils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class NiFiRecordSerDe extends AbstractSerDe { + + protected RecordReader recordReader; + protected ComponentLog log; + protected List<String> columnNames; + protected StructTypeInfo schema; + + protected StandardStructObjectInspector cachedObjectInspector; + protected TimestampParser tsParser; + + private final static Pattern INTERNAL_PATTERN = Pattern.compile("_col([0-9]+)"); + + public NiFiRecordSerDe(RecordReader recordReader, ComponentLog log) { + this.recordReader = recordReader; + this.log = log; + } + + @Override + public void initialize(Configuration conf, Properties tbl) { + List<TypeInfo> columnTypes; + StructTypeInfo rowTypeInfo; + + log.debug("Initializing NiFiRecordSerDe: {}", tbl.entrySet().toArray()); + + // Get column names and types + String columnNameProperty = tbl.getProperty(serdeConstants.LIST_COLUMNS); + String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES); + final String columnNameDelimiter = tbl.containsKey(serdeConstants.COLUMN_NAME_DELIMITER) ? tbl + .getProperty(serdeConstants.COLUMN_NAME_DELIMITER) : String.valueOf(SerDeUtils.COMMA); + // all table column names + if (columnNameProperty.isEmpty()) { + columnNames = new ArrayList<>(0); + } else { + columnNames = new ArrayList<>(Arrays.asList(columnNameProperty.split(columnNameDelimiter))); + } + + // all column types + if (columnTypeProperty.isEmpty()) { + columnTypes = new ArrayList<>(0); + } else { + columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); + } + + log.debug("columns: {}, {}", new Object[]{columnNameProperty, columnNames}); + log.debug("types: {}, {} ", new Object[]{columnTypeProperty, columnTypes}); + + assert (columnNames.size() == columnTypes.size()); + + rowTypeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes); + schema = rowTypeInfo; + log.debug("schema : {}", new Object[]{schema}); + cachedObjectInspector = (StandardStructObjectInspector) TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(rowTypeInfo); + tsParser = new TimestampParser(HiveStringUtils.splitAndUnEscape(tbl.getProperty(serdeConstants.TIMESTAMP_FORMATS))); + } + + @Override + public Class<? extends Writable> getSerializedClass() { + return null; + } + + @Override + public Writable serialize(Object o, ObjectInspector objectInspector) throws SerDeException { + return null; + } + + @Override + public SerDeStats getSerDeStats() { + return null; + } + + @Override + public Object deserialize(Writable writable) throws SerDeException { + ObjectWritable t = (ObjectWritable) writable; + Record record = (Record) t.get(); + List<Object> r = new ArrayList<>(Collections.nCopies(columnNames.size(), null)); + try { + RecordSchema recordSchema = record.getSchema(); + for (RecordField field : recordSchema.getFields()) { + String fieldName = field.getFieldName(); --- End diff -- Ignored. Static partitions doesn't have to be in the record since its already specified in the API. > Add support for Hive 3.0 processors > ----------------------------------- > > Key: NIFI-4963 > URL: https://issues.apache.org/jira/browse/NIFI-4963 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions > Reporter: Matt Burgess > Assignee: Matt Burgess > Priority: Major > > Apache Hive is working on Hive 3.0, this Jira is to add a bundle of > components (much like the current Hive bundle) that supports Hive 3.0 (and > Apache ORC if necessary). -- This message was sent by Atlassian JIRA (v7.6.3#76005)