Github user prasanthj commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2755#discussion_r193182827
  
    --- Diff: 
nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java
 ---
    @@ -0,0 +1,247 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.hive.streaming;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.hive.conf.HiveConf;
    +import org.apache.hadoop.hive.serde.serdeConstants;
    +import org.apache.hadoop.hive.serde2.AbstractSerDe;
    +import org.apache.hadoop.hive.serde2.SerDeException;
    +import org.apache.hadoop.hive.serde2.SerDeStats;
    +import org.apache.hadoop.hive.serde2.SerDeUtils;
    +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
    +import 
org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
    +import 
org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
    +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
    +import org.apache.hadoop.io.ObjectWritable;
    +import org.apache.hadoop.io.Writable;
    +import org.apache.hive.common.util.HiveStringUtils;
    +import org.apache.hive.common.util.TimestampParser;
    +import org.apache.nifi.avro.AvroTypeUtil;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.util.DataTypeUtils;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Properties;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +public class NiFiRecordSerDe extends AbstractSerDe {
    +
    +    protected RecordReader recordReader;
    +    protected ComponentLog log;
    +    protected List<String> columnNames;
    +    protected StructTypeInfo schema;
    +
    +    protected StandardStructObjectInspector cachedObjectInspector;
    +    protected TimestampParser tsParser;
    +
    +    private final static Pattern INTERNAL_PATTERN = 
Pattern.compile("_col([0-9]+)");
    +
    +    public NiFiRecordSerDe(RecordReader recordReader, ComponentLog log) {
    +        this.recordReader = recordReader;
    +        this.log = log;
    +    }
    +
    +    @Override
    +    public void initialize(Configuration conf, Properties tbl) {
    +        List<TypeInfo> columnTypes;
    +        StructTypeInfo rowTypeInfo;
    +
    +        log.debug("Initializing NiFiRecordSerDe: {}", 
tbl.entrySet().toArray());
    +
    +        // Get column names and types
    +        String columnNameProperty = 
tbl.getProperty(serdeConstants.LIST_COLUMNS);
    +        String columnTypeProperty = 
tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);
    +        final String columnNameDelimiter = 
tbl.containsKey(serdeConstants.COLUMN_NAME_DELIMITER) ? tbl
    +                .getProperty(serdeConstants.COLUMN_NAME_DELIMITER) : 
String.valueOf(SerDeUtils.COMMA);
    +        // all table column names
    +        if (columnNameProperty.isEmpty()) {
    +            columnNames = new ArrayList<>(0);
    +        } else {
    +            columnNames = new 
ArrayList<>(Arrays.asList(columnNameProperty.split(columnNameDelimiter)));
    +        }
    +
    +        // all column types
    +        if (columnTypeProperty.isEmpty()) {
    +            columnTypes = new ArrayList<>(0);
    +        } else {
    +            columnTypes = 
TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
    +        }
    +
    +        log.debug("columns: {}, {}", new Object[]{columnNameProperty, 
columnNames});
    +        log.debug("types: {}, {} ", new Object[]{columnTypeProperty, 
columnTypes});
    +
    +        assert (columnNames.size() == columnTypes.size());
    +
    +        rowTypeInfo = (StructTypeInfo) 
TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes);
    +        schema = rowTypeInfo;
    +        log.debug("schema : {}", new Object[]{schema});
    +        cachedObjectInspector = (StandardStructObjectInspector) 
TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(rowTypeInfo);
    +        tsParser = new 
TimestampParser(HiveStringUtils.splitAndUnEscape(tbl.getProperty(serdeConstants.TIMESTAMP_FORMATS)));
    +    }
    +
    +    @Override
    +    public Class<? extends Writable> getSerializedClass() {
    +        return null;
    +    }
    +
    +    @Override
    +    public Writable serialize(Object o, ObjectInspector objectInspector) 
throws SerDeException {
    +        return null;
    +    }
    +
    +    @Override
    +    public SerDeStats getSerDeStats() {
    +        return null;
    +    }
    +
    +    @Override
    +    public Object deserialize(Writable writable) throws SerDeException {
    +        ObjectWritable t = (ObjectWritable) writable;
    +        Record record = (Record) t.get();
    +        List<Object> r = new 
ArrayList<>(Collections.nCopies(columnNames.size(), null));
    +        try {
    +            RecordSchema recordSchema = record.getSchema();
    +            for (RecordField field : recordSchema.getFields()) {
    +                String fieldName = field.getFieldName();
    --- End diff --
    
    Yes. At the very least this should happen once per file and not per record. 
    For unpartitioned or statically partitioned case, this should not happen as 
there is no partition columns to extract (static partitions goes into API) so 
should use fast path without these checks and allocations.  


---

Reply via email to