[ 
https://issues.apache.org/jira/browse/NIFI-1663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15344480#comment-15344480
 ] 

ASF GitHub Bot commented on NIFI-1663:
--------------------------------------

Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/477#discussion_r68073235
  
    --- Diff: 
nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/ConvertAvroToORC.java
 ---
    @@ -0,0 +1,303 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.hive;
    +
    +import org.apache.avro.Schema;
    +import org.apache.avro.file.DataFileStream;
    +import org.apache.avro.generic.GenericDatumReader;
    +import org.apache.avro.generic.GenericRecord;
    +import org.apache.commons.lang3.mutable.MutableInt;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.StreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.util.hive.HiveJdbcCommon;
    +import org.apache.nifi.util.orc.OrcFlowFileWriter;
    +import org.apache.nifi.util.orc.OrcUtils;
    +import org.apache.orc.CompressionKind;
    +import org.apache.orc.OrcFile;
    +import org.apache.orc.TypeDescription;
    +
    +import java.io.BufferedInputStream;
    +import java.io.BufferedOutputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicInteger;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * The ConvertAvroToORC processor takes an Avro-formatted flow file as 
input and converts it into ORC format.
    + */
    +@SideEffectFree
    +@SupportsBatching
    +@Tags({"avro", "orc", "hive", "convert"})
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@CapabilityDescription("Converts an Avro record into ORC file format. This 
processor provides a direct mapping of an Avro record to an ORC record, such "
    +        + "that the resulting ORC file will have the same hierarchical 
structure as the Avro document. If an incoming FlowFile contains a stream of "
    +        + "multiple Avro records, the resultant FlowFile will contain a 
ORC file containing all of the Avro records.  If an incoming FlowFile does "
    +        + "not contain any records, an empty ORC file is the output.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "mime.type", description = "Sets the 
mime type to application/octet-stream"),
    +        @WritesAttribute(attribute = "filename", description = "Sets the 
filename to the existing filename with the extension replaced by / added to by 
.orc"),
    +        @WritesAttribute(attribute = "record.count", description = "Sets 
the number of records in the ORC file."),
    +        @WritesAttribute(attribute = "hive.ddl", description = "Creates a 
partial Hive DDL statement for creating a table in Hive from this ORC file. "
    +                + "This can be used in ReplaceText for setting the content 
to the DDL. To make it valid DDL, add \"LOCATION 
'<path_to_orc_file_in_hdfs>'\", where "
    +                + "the path is the directory that contains this ORC file 
on HDFS. For example, ConvertAvroToORC can send flow files to a PutHDFS 
processor to send the file to "
    +                + "HDFS, then to a ReplaceText to set the content to this 
DDL (plus the LOCATION clause as described), then to PutHiveQL processor to 
create the table "
    +                + "if it doesn't exist.")
    +})
    +public class ConvertAvroToORC extends AbstractProcessor {
    +
    +    // Attributes
    +    public static final String ORC_MIME_TYPE = "application/octet-stream";
    +    public static final String HIVE_DDL_ATTRIBUTE = "hive.ddl";
    +    public static final String RECORD_COUNT_ATTRIBUTE = "record.count";
    +
    +
    +    // Properties
    +    public static final PropertyDescriptor ORC_CONFIGURATION_RESOURCES = 
new PropertyDescriptor.Builder()
    +            .name("orc-config-resources")
    +            .displayName("ORC Configuration Resources")
    +            .description("A file or comma separated list of files which 
contains the ORC configuration (hive-site.xml, e.g.). Without this, Hadoop "
    +                    + "will search the classpath for a 'hive-site.xml' 
file or will revert to a default configuration. Please see the ORC 
documentation for more details.")
    +            
.required(false).addValidator(HiveJdbcCommon.createMultipleFilesExistValidator()).build();
    +
    +    public static final PropertyDescriptor STRIPE_SIZE = new 
PropertyDescriptor.Builder()
    +            .name("orc-stripe-size")
    +            .displayName("Stripe Size")
    +            .description("The size of the memory buffer (in bytes) for 
writing stripes to an ORC file")
    +            .required(true)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("100000")
    +            .build();
    +
    +    public static final PropertyDescriptor BUFFER_SIZE = new 
PropertyDescriptor.Builder()
    +            .name("orc-buffer-size")
    +            .displayName("Buffer Size")
    +            .description("The maximum size of the memory buffers (in 
bytes) used for compressing and storing a stripe in memory. This is a hint to 
the ORC writer, "
    +                    + "which may choose to use a smaller buffer size based 
on stripe size and number of columns for efficient stripe writing and memory 
utilization.")
    +            .required(true)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("10000")
    +            .build();
    +
    +    public static final PropertyDescriptor COMPRESSION_TYPE = new 
PropertyDescriptor.Builder()
    +            .name("orc-compression-type")
    +            .displayName("Compression Type")
    +            .required(true)
    +            .allowableValues("NONE", "ZLIB", "SNAPPY", "LZO")
    +            .defaultValue("NONE")
    +            .build();
    +
    +    public static final PropertyDescriptor HIVE_TABLE_NAME = new 
PropertyDescriptor.Builder()
    +            .name("orc-hive-table-name")
    +            .displayName("Hive Table Name")
    +            .description("An optional table name to insert into the 
hive.ddl attribute. The generated DDL can be used by "
    +                    + "a PutHiveQL processor (presumably after a PutHDFS 
processor) to create a table backed by the converted ORC file. "
    +                    + "If this property is not provided, the full name 
(including namespace) of the incoming Avro record will be normalized "
    +                    + "and used as the table name.")
    +            .required(false)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +
    +    // Relationships
    +    static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("A FlowFile is routed to this relationship after 
it has been converted to ORC format.")
    +            .build();
    +    static final Relationship REL_FAILURE = new Relationship.Builder()
    +            .name("failure")
    +            .description("A FlowFile is routed to this relationship if it 
cannot be parsed as Avro or cannot be converted to ORC for any reason")
    +            .build();
    +
    +    private final static List<PropertyDescriptor> propertyDescriptors;
    +    private final static Set<Relationship> relationships;
    +
    +    private volatile Configuration orcConfig;
    +
    +    /*
    +     * Will ensure that the list of property descriptors is built only 
once.
    +     * Will also create a Set of relationships
    +     */
    +    static {
    +        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
    +        _propertyDescriptors.add(ORC_CONFIGURATION_RESOURCES);
    +        _propertyDescriptors.add(STRIPE_SIZE);
    +        _propertyDescriptors.add(BUFFER_SIZE);
    +        _propertyDescriptors.add(COMPRESSION_TYPE);
    +        _propertyDescriptors.add(HIVE_TABLE_NAME);
    +        propertyDescriptors = 
Collections.unmodifiableList(_propertyDescriptors);
    +
    +        Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.add(REL_SUCCESS);
    +        _relationships.add(REL_FAILURE);
    +        relationships = Collections.unmodifiableSet(_relationships);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @OnScheduled
    +    public void setup(ProcessContext context) {
    +        boolean confFileProvided = 
context.getProperty(ORC_CONFIGURATION_RESOURCES).isSet();
    +        if (confFileProvided) {
    +            final String configFiles = 
context.getProperty(ORC_CONFIGURATION_RESOURCES).getValue();
    +            orcConfig = 
HiveJdbcCommon.getConfigurationFromFiles(configFiles);
    +        }
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        try {
    +            final long stripeSize = 
context.getProperty(STRIPE_SIZE).asLong();
    +            final int bufferSize = 
context.getProperty(BUFFER_SIZE).asInteger();
    +            final CompressionKind compressionType = 
CompressionKind.valueOf(context.getProperty(COMPRESSION_TYPE).getValue());
    +            final AtomicReference<Schema> hiveAvroSchema = new 
AtomicReference<>(null);
    +            final AtomicInteger totalRecordCount = new AtomicInteger(0);
    +            final String fileName = 
flowFile.getAttribute(CoreAttributes.FILENAME.key());
    +            flowFile = session.write(flowFile, new StreamCallback() {
    +                @Override
    +                public void process(final InputStream rawIn, final 
OutputStream rawOut) throws IOException {
    +                    try (final InputStream in = new 
BufferedInputStream(rawIn);
    +                         final OutputStream out = new 
BufferedOutputStream(rawOut);
    +                         final DataFileStream<GenericRecord> reader = new 
DataFileStream<>(in, new GenericDatumReader<>())) {
    +
    +                        // Create ORC schema from Avro schema
    +                        Schema avroSchema = reader.getSchema();
    +                        TypeDescription orcSchema = 
OrcUtils.getOrcField(avroSchema);
    +
    +                        if (orcConfig == null) {
    +                            orcConfig = new Configuration();
    +                        }
    +                        OrcFile.WriterOptions options = 
OrcFile.writerOptions(orcConfig)
    +                                .setSchema(orcSchema)
    +                                .stripeSize(stripeSize)
    +                                .bufferSize(bufferSize)
    +                                .compress(compressionType)
    +                                .version(OrcFile.Version.CURRENT);
    +
    +                        OrcFlowFileWriter orcWriter = new 
OrcFlowFileWriter(out, new Path(fileName), options);
    +
    +                        VectorizedRowBatch batch = 
orcSchema.createRowBatch();
    +                        int recordCount = 0;
    +                        int recordsInBatch = 0;
    +                        GenericRecord currRecord = null;
    +                        while (reader.hasNext()) {
    +                            currRecord = reader.next(currRecord);
    +                            List<Schema.Field> fields = 
currRecord.getSchema().getFields();
    +                            if (fields != null) {
    +                                MutableInt[] vectorOffsets = new 
MutableInt[fields.size()];
    +                                for (int i = 0; i < fields.size(); i++) {
    +                                    vectorOffsets[i] = new MutableInt(0);
    +                                    Schema.Field field = fields.get(i);
    +                                    Schema fieldSchema = field.schema();
    +                                    Object o = 
currRecord.get(field.name());
    +                                    try {
    +                                        
OrcUtils.putToRowBatch(batch.cols[i], vectorOffsets[i], recordsInBatch, 
fieldSchema, o);
    +                                    } catch 
(ArrayIndexOutOfBoundsException aioobe) {
    +                                        getLogger().error("Index out of 
bounds at record {} for column {}, type {}, and object {}",
    +                                                new 
Object[]{recordsInBatch, i, fieldSchema.getType().getName(), o.toString()},
    +                                                aioobe);
    +                                        throw new IOException(aioobe);
    +                                    }
    +                                }
    +                            }
    +                            recordCount++;
    +                            recordsInBatch++;
    +
    +                            if (recordsInBatch == batch.getMaxSize()) {
    +                                // add batch and start a new one
    +                                batch.size = recordsInBatch;
    +                                orcWriter.addRowBatch(batch);
    +                                batch = orcSchema.createRowBatch();
    +                                recordsInBatch = 0;
    +                            }
    +                        }
    +
    +                        // If there are records in the batch, add the batch
    +                        if (recordsInBatch > 0) {
    +                            batch.size = recordsInBatch;
    +                            orcWriter.addRowBatch(batch);
    +                        }
    +
    +                        // finished writing this record, close the writer 
(which will flush to the flow file)
    +                        orcWriter.close();
    +                        hiveAvroSchema.set(avroSchema);
    +                        totalRecordCount.set(recordCount);
    +                    }
    +                }
    +            });
    +
    +            final String hiveTableName = 
context.getProperty(HIVE_TABLE_NAME).isSet()
    +                    ? context.getProperty(HIVE_TABLE_NAME).getValue()
    +                    : 
OrcUtils.normalizeHiveTableName(hiveAvroSchema.get().getFullName());
    +            String hiveDDL = 
OrcUtils.generateHiveDDL(hiveAvroSchema.get(), hiveTableName);
    +            // Add attributes and transfer to success
    +            flowFile = session.putAttribute(flowFile, 
RECORD_COUNT_ATTRIBUTE, Integer.toString(totalRecordCount.get()));
    +            flowFile = session.putAttribute(flowFile, HIVE_DDL_ATTRIBUTE, 
hiveDDL);
    +            StringBuilder newFilename = new StringBuilder();
    +            int extensionIndex = fileName.lastIndexOf(".");
    +            if (extensionIndex != -1) {
    +                newFilename.append(fileName.substring(0, extensionIndex));
    +            } else {
    +                newFilename.append(fileName);
    +            }
    +            newFilename.append(".orc");
    +            flowFile = session.putAttribute(flowFile, 
CoreAttributes.MIME_TYPE.key(), ORC_MIME_TYPE);
    +            flowFile = session.putAttribute(flowFile, 
CoreAttributes.FILENAME.key(), newFilename.toString());
    +            session.transfer(flowFile, REL_SUCCESS);
    --- End diff --
    
    Good idea, will add


> Add support for ORC format
> --------------------------
>
>                 Key: NIFI-1663
>                 URL: https://issues.apache.org/jira/browse/NIFI-1663
>             Project: Apache NiFi
>          Issue Type: New Feature
>            Reporter: Matt Burgess
>            Assignee: Matt Burgess
>             Fix For: 1.0.0
>
>
> From the Hive/ORC wiki 
> (https://cwiki.apache.org/confluence/display/Hive/LanguageManual+ORC): 
> The Optimized Row Columnar (ORC) file format provides a highly efficient way 
> to store Hive data ... Using ORC files improves performance when Hive is 
> reading, writing, and processing data.
> As users are interested in NiFi integrations with Hive (NIFI-981, NIFI-1193, 
> etc.), NiFi should be able to support ORC file format to enable users to 
> efficiently store flow files for use by Hive.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to