[
https://issues.apache.org/jira/browse/NIFI-1663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15404979#comment-15404979
]
ASF GitHub Bot commented on NIFI-1663:
--------------------------------------
Github user olegz commented on a diff in the pull request:
https://github.com/apache/nifi/pull/727#discussion_r73255072
--- Diff:
nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
---
@@ -0,0 +1,657 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hive;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.hcatalog.streaming.ConnectionError;
+import org.apache.hive.hcatalog.streaming.HiveEndPoint;
+import org.apache.hive.hcatalog.streaming.SerializationError;
+import org.apache.hive.hcatalog.streaming.StreamingException;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.hadoop.KerberosProperties;
+import org.apache.nifi.hadoop.SecurityUtil;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.util.hive.AuthenticationFailedException;
+import org.apache.nifi.util.hive.HiveConfigurator;
+import org.apache.nifi.util.hive.HiveOptions;
+import org.apache.nifi.util.hive.HiveUtils;
+import org.apache.nifi.util.hive.HiveWriter;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Pattern;
+
+/**
+ * This processor utilizes the Hive Streaming capability to insert data
from the flow into a Hive database table.
+ */
+@Tags({"hive", "streaming", "put", "database", "store"})
+@CapabilityDescription("This processor uses Hive Streaming to send flow
file data to an Apache Hive table. The incoming flow file is expected to be in "
+ + "Avro format and the table must exist in Hive. Please see the
Hive documentation for requirements on the Hive table (format, partitions,
etc.). "
+ + "The partition values are extracted from the Avro record based
on the names of the partition columns as specified in the processor. ")
+@WritesAttributes({
+ @WritesAttribute(attribute = "hivestreaming.record.count",
description = "The number of records from this flow file written using Hive
Streaming.")
+})
+public class PutHiveStreaming extends AbstractProcessor {
+
+ // Attributes
+ public static final String HIVE_STREAMING_RECORD_COUNT_ATTR =
"hivestreaming.record.count";
+
+ // Validators
+ private static final Validator GREATER_THAN_ONE_VALIDATOR = (subject,
value, context) -> {
+ if (context.isExpressionLanguageSupported(subject) &&
context.isExpressionLanguagePresent(value)) {
+ return new
ValidationResult.Builder().subject(subject).input(value).explanation("Expression
Language Present").valid(true).build();
+ }
+
+ String reason = null;
+ try {
+ final int intVal = Integer.parseInt(value);
+
+ if (intVal < 2) {
+ reason = "value is less than 2";
+ }
+ } catch (final NumberFormatException e) {
+ reason = "value is not a valid integer";
+ }
+
+ return new
ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason
== null).build();
+ };
+
+ // Properties
+ public static final PropertyDescriptor METASTORE_URI = new
PropertyDescriptor.Builder()
+ .name("hive-stream-metastore-uri")
+ .displayName("Hive Metastore URI")
+ .description("The URI location for the Hive Metastore. Note
that this is not the location of the Hive Server. The default port for the "
+ + "Hive metastore is 9043.")
+ .required(true)
+ .addValidator(StandardValidators.URI_VALIDATOR)
+
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("(^[^/]+.*[^/]+$|^[^/]+$|^$)")))
// no start with / or end with /
+ .build();
+
+ public static final PropertyDescriptor HIVE_CONFIGURATION_RESOURCES =
new PropertyDescriptor.Builder()
+ .name("hive-config-resources")
+ .displayName("Hive Configuration Resources")
+ .description("A file or comma separated list of files which
contains the Hive configuration (hive-site.xml, e.g.). Without this, Hadoop "
+ + "will search the classpath for a 'hive-site.xml'
file or will revert to a default configuration. Note that to enable
authentication "
+ + "with Kerberos e.g., the appropriate properties must
be set in the configuration files. Please see the Hive documentation for more
details.")
+ .required(false)
+ .addValidator(HiveUtils.createMultipleFilesExistValidator())
+ .build();
+
+ public static final PropertyDescriptor DB_NAME = new
PropertyDescriptor.Builder()
+ .name("hive-stream-database-name")
+ .displayName("Database Name")
+ .description("The name of the database in which to put the
data.")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor TABLE_NAME = new
PropertyDescriptor.Builder()
+ .name("hive-stream-table-name")
+ .displayName("Table Name")
+ .description("The name of the database table in which to put
the data.")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor PARTITION_COLUMNS = new
PropertyDescriptor.Builder()
+ .name("hive-stream-partition-cols")
+ .displayName("Partition Columns")
+ .description("A comma-delimited list of column names on which
the table has been partitioned. The order of values in this list must "
+ + "correspond exactly to the order of partition
columns specified during the table creation.")
+ .required(false)
+ .expressionLanguageSupported(false)
+
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("[^,]+(,[^,]+)*")))
// comma-separated list with non-empty entries
+ .build();
+
+ public static final PropertyDescriptor AUTOCREATE_PARTITIONS = new
PropertyDescriptor.Builder()
+ .name("hive-stream-autocreate-partition")
+ .displayName("Auto-Create Partitions")
+ .description("Flag indicating whether partitions should be
automatically created")
+ .required(true)
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .build();
+
+ public static final PropertyDescriptor MAX_OPEN_CONNECTIONS = new
PropertyDescriptor.Builder()
+ .name("hive-stream-max-open-connections")
+ .displayName("Max Open Connections")
+ .description("The maximum number of open connections that can
be allocated from this pool at the same time, "
+ + "or negative for no limit.")
+ .defaultValue("8")
+ .required(true)
+ .addValidator(StandardValidators.INTEGER_VALIDATOR)
+ .sensitive(false)
+ .build();
+
+ public static final PropertyDescriptor HEARTBEAT_INTERVAL = new
PropertyDescriptor.Builder()
+ .name("hive-stream-heartbeat-interval")
+ .displayName("Heartbeat Interval")
+ .description("Indicates that a heartbeat should be sent when
the specified number of seconds has elapsed. "
+ + "A value of 0 indicates that no heartbeat should be
sent.")
+ .defaultValue("60")
+ .required(true)
+
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .sensitive(false)
+ .build();
+
+ public static final PropertyDescriptor TXNS_PER_BATCH = new
PropertyDescriptor.Builder()
+ .name("hive-stream-transactions-per-batch")
+ .displayName("Transactions per Batch")
+ .description("A hint to Hive Streaming indicating how many
transactions the processor task will need. This value must be greater than 1.")
+ .required(true)
+ .expressionLanguageSupported(true)
+ .addValidator(GREATER_THAN_ONE_VALIDATOR)
+ .defaultValue("100")
+ .build();
+
+ // Relationships
+ public static final Relationship REL_SUCCESS = new
Relationship.Builder()
+ .name("success")
+ .description("A FlowFile is routed to this relationship after
the database is successfully updated")
+ .build();
+ public static final Relationship REL_RETRY = new Relationship.Builder()
--- End diff --
Hmm, I wonder if this is really needed especially when REL_FAILURE could be
looped back to the processor. Am I missing something?
> Add support for ORC format
> --------------------------
>
> Key: NIFI-1663
> URL: https://issues.apache.org/jira/browse/NIFI-1663
> Project: Apache NiFi
> Issue Type: New Feature
> Reporter: Matt Burgess
> Assignee: Matt Burgess
> Fix For: 1.0.0
>
>
> From the Hive/ORC wiki
> (https://cwiki.apache.org/confluence/display/Hive/LanguageManual+ORC):
> The Optimized Row Columnar (ORC) file format provides a highly efficient way
> to store Hive data ... Using ORC files improves performance when Hive is
> reading, writing, and processing data.
> As users are interested in NiFi integrations with Hive (NIFI-981, NIFI-1193,
> etc.), NiFi should be able to support ORC file format to enable users to
> efficiently store flow files for use by Hive.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)