[
https://issues.apache.org/jira/browse/NIFI-1663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15344454#comment-15344454
]
ASF GitHub Bot commented on NIFI-1663:
--------------------------------------
Github user markap14 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/477#discussion_r68071005
--- Diff:
nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/ConvertAvroToORC.java
---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hive;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.StreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.hive.HiveJdbcCommon;
+import org.apache.nifi.util.orc.OrcFlowFileWriter;
+import org.apache.nifi.util.orc.OrcUtils;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * The ConvertAvroToORC processor takes an Avro-formatted flow file as
input and converts it into ORC format.
+ */
+@SideEffectFree
+@SupportsBatching
+@Tags({"avro", "orc", "hive", "convert"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Converts an Avro record into ORC file format. This
processor provides a direct mapping of an Avro record to an ORC record, such "
+ + "that the resulting ORC file will have the same hierarchical
structure as the Avro document. If an incoming FlowFile contains a stream of "
+ + "multiple Avro records, the resultant FlowFile will contain a
ORC file containing all of the Avro records. If an incoming FlowFile does "
+ + "not contain any records, an empty ORC file is the output.")
+@WritesAttributes({
+ @WritesAttribute(attribute = "mime.type", description = "Sets the
mime type to application/octet-stream"),
+ @WritesAttribute(attribute = "filename", description = "Sets the
filename to the existing filename with the extension replaced by / added to by
.orc"),
+ @WritesAttribute(attribute = "record.count", description = "Sets
the number of records in the ORC file."),
+ @WritesAttribute(attribute = "hive.ddl", description = "Creates a
partial Hive DDL statement for creating a table in Hive from this ORC file. "
+ + "This can be used in ReplaceText for setting the content
to the DDL. To make it valid DDL, add \"LOCATION
'<path_to_orc_file_in_hdfs>'\", where "
+ + "the path is the directory that contains this ORC file
on HDFS. For example, ConvertAvroToORC can send flow files to a PutHDFS
processor to send the file to "
+ + "HDFS, then to a ReplaceText to set the content to this
DDL (plus the LOCATION clause as described), then to PutHiveQL processor to
create the table "
+ + "if it doesn't exist.")
+})
+public class ConvertAvroToORC extends AbstractProcessor {
+
+ // Attributes
+ public static final String ORC_MIME_TYPE = "application/octet-stream";
+ public static final String HIVE_DDL_ATTRIBUTE = "hive.ddl";
+ public static final String RECORD_COUNT_ATTRIBUTE = "record.count";
+
+
+ // Properties
+ public static final PropertyDescriptor ORC_CONFIGURATION_RESOURCES =
new PropertyDescriptor.Builder()
+ .name("orc-config-resources")
+ .displayName("ORC Configuration Resources")
+ .description("A file or comma separated list of files which
contains the ORC configuration (hive-site.xml, e.g.). Without this, Hadoop "
+ + "will search the classpath for a 'hive-site.xml'
file or will revert to a default configuration. Please see the ORC
documentation for more details.")
+
.required(false).addValidator(HiveJdbcCommon.createMultipleFilesExistValidator()).build();
+
+ public static final PropertyDescriptor STRIPE_SIZE = new
PropertyDescriptor.Builder()
+ .name("orc-stripe-size")
+ .displayName("Stripe Size")
+ .description("The size of the memory buffer (in bytes) for
writing stripes to an ORC file")
+ .required(true)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("100000")
+ .build();
+
+ public static final PropertyDescriptor BUFFER_SIZE = new
PropertyDescriptor.Builder()
+ .name("orc-buffer-size")
+ .displayName("Buffer Size")
+ .description("The maximum size of the memory buffers (in
bytes) used for compressing and storing a stripe in memory. This is a hint to
the ORC writer, "
--- End diff --
Should use data sizes here as well.
> Add support for ORC format
> --------------------------
>
> Key: NIFI-1663
> URL: https://issues.apache.org/jira/browse/NIFI-1663
> Project: Apache NiFi
> Issue Type: New Feature
> Reporter: Matt Burgess
> Assignee: Matt Burgess
> Fix For: 1.0.0
>
>
> From the Hive/ORC wiki
> (https://cwiki.apache.org/confluence/display/Hive/LanguageManual+ORC):
> The Optimized Row Columnar (ORC) file format provides a highly efficient way
> to store Hive data ... Using ORC files improves performance when Hive is
> reading, writing, and processing data.
> As users are interested in NiFi integrations with Hive (NIFI-981, NIFI-1193,
> etc.), NiFi should be able to support ORC file format to enable users to
> efficiently store flow files for use by Hive.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)