[
https://issues.apache.org/jira/browse/NIFI-1868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15397924#comment-15397924
]
ASF GitHub Bot commented on NIFI-1868:
--------------------------------------
Github user bbende commented on a diff in the pull request:
https://github.com/apache/nifi/pull/706#discussion_r72672215
--- Diff:
nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
---
@@ -0,0 +1,657 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hive;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.hcatalog.streaming.ConnectionError;
+import org.apache.hive.hcatalog.streaming.HiveEndPoint;
+import org.apache.hive.hcatalog.streaming.SerializationError;
+import org.apache.hive.hcatalog.streaming.StreamingException;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.hadoop.KerberosProperties;
+import org.apache.nifi.hadoop.SecurityUtil;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.util.hive.AuthenticationFailedException;
+import org.apache.nifi.util.hive.HiveConfigurator;
+import org.apache.nifi.util.hive.HiveOptions;
+import org.apache.nifi.util.hive.HiveUtils;
+import org.apache.nifi.util.hive.HiveWriter;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Pattern;
+
+/**
+ * This processor utilizes the Hive Streaming capability to insert data
from the flow into a Hive database table.
+ */
+@Tags({"hive", "streaming", "put", "database", "store"})
+@CapabilityDescription("This processor uses Hive Streaming to send flow
file data to an Apache Hive table. The incoming flow file is expected to be in "
+ + "Avro format and the table must exist in Hive. Please see the
Hive documentation for requirements on the Hive table (format, partitions,
etc.). "
+ + "The partition values are extracted from the Avro record based
on the names of the partition columns as specified in the processor. ")
+@WritesAttributes({
+ @WritesAttribute(attribute = "hivestreaming.record.count",
description = "The number of records from this flow file written using Hive
Streaming.")
+})
+public class PutHiveStreaming extends AbstractProcessor {
+
+ // Attributes
+ public static final String HIVE_STREAMING_RECORD_COUNT_ATTR =
"hivestreaming.record.count";
+
+ // Validators
+ private static final Validator GREATER_THAN_ONE_VALIDATOR = (subject,
value, context) -> {
+ if (context.isExpressionLanguageSupported(subject) &&
context.isExpressionLanguagePresent(value)) {
+ return new
ValidationResult.Builder().subject(subject).input(value).explanation("Expression
Language Present").valid(true).build();
+ }
+
+ String reason = null;
+ try {
+ final int intVal = Integer.parseInt(value);
+
+ if (intVal < 2) {
+ reason = "value is less than 2";
+ }
+ } catch (final NumberFormatException e) {
+ reason = "value is not a valid integer";
+ }
+
+ return new
ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason
== null).build();
+ };
+
+ // Properties
+ public static final PropertyDescriptor METASTORE_URI = new
PropertyDescriptor.Builder()
+ .name("hive-stream-metastore-uri")
+ .displayName("Hive Metastore URI")
+ .description("The URI location for the Hive Metastore. Note
that this is not the location of the Hive Server. The default port for the "
+ + "Hive metastore is 9043.")
+ .required(true)
+ .addValidator(StandardValidators.URI_VALIDATOR)
+
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("(^[^/]+.*[^/]+$|^[^/]+$|^$)")))
// no start with / or end with /
+ .build();
+
+ public static final PropertyDescriptor HIVE_CONFIGURATION_RESOURCES =
new PropertyDescriptor.Builder()
+ .name("hive-config-resources")
+ .displayName("Hive Configuration Resources")
+ .description("A file or comma separated list of files which
contains the Hive configuration (hive-site.xml, e.g.). Without this, Hadoop "
+ + "will search the classpath for a 'hive-site.xml'
file or will revert to a default configuration. Note that to enable
authentication "
+ + "with Kerberos e.g., the appropriate properties must
be set in the configuration files. Please see the Hive documentation for more
details.")
+ .required(false)
+ .addValidator(HiveUtils.createMultipleFilesExistValidator())
+ .build();
+
+ public static final PropertyDescriptor DB_NAME = new
PropertyDescriptor.Builder()
+ .name("hive-stream-database-name")
+ .displayName("Database Name")
+ .description("The name of the database in which to put the
data.")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor TABLE_NAME = new
PropertyDescriptor.Builder()
+ .name("hive-stream-table-name")
+ .displayName("Table Name")
+ .description("The name of the database table in which to put
the data.")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor PARTITION_COLUMNS = new
PropertyDescriptor.Builder()
+ .name("hive-stream-partition-cols")
+ .displayName("Partition Columns")
+ .description("A comma-delimited list of column names on which
the table has been partitioned. The order of values in this list must "
+ + "correspond exactly to the order of partition
columns specified during the table creation.")
+ .required(false)
+ .expressionLanguageSupported(false)
+
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("[^,]+(,[^,]+)*")))
// comma-separated list with non-empty entries
+ .build();
+
+ public static final PropertyDescriptor AUTOCREATE_PARTITIONS = new
PropertyDescriptor.Builder()
+ .name("hive-stream-autocreate-partition")
+ .displayName("Auto-Create Partitions")
+ .description("Flag indicating whether partitions should be
automatically created")
+ .required(true)
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .build();
+
+ public static final PropertyDescriptor MAX_OPEN_CONNECTIONS = new
PropertyDescriptor.Builder()
+ .name("hive-stream-max-open-connections")
+ .displayName("Max Open Connections")
+ .description("The maximum number of open connections that can
be allocated from this pool at the same time, "
+ + "or negative for no limit.")
+ .defaultValue("8")
+ .required(true)
+ .addValidator(StandardValidators.INTEGER_VALIDATOR)
+ .sensitive(false)
+ .build();
+
+ public static final PropertyDescriptor HEARTBEAT_INTERVAL = new
PropertyDescriptor.Builder()
+ .name("hive-stream-heartbeat-interval")
+ .displayName("Heartbeat Interval")
+ .description("Indicates that a heartbeat should be sent when
the specified number of seconds has elapsed. "
+ + "A value of 0 indicates that no heartbeat should be
sent.")
+ .defaultValue("60")
+ .required(true)
+
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .sensitive(false)
+ .build();
+
+ public static final PropertyDescriptor TXNS_PER_BATCH = new
PropertyDescriptor.Builder()
+ .name("hive-stream-transactions-per-batch")
+ .displayName("Transactions per Batch")
+ .description("A hint to Hive Streaming indicating how many
transactions the processor task will need. This value must be greater than 1.")
+ .required(true)
+ .expressionLanguageSupported(true)
+ .addValidator(GREATER_THAN_ONE_VALIDATOR)
+ .defaultValue("100")
+ .build();
+
+ // Relationships
+ public static final Relationship REL_SUCCESS = new
Relationship.Builder()
+ .name("success")
+ .description("A FlowFile is routed to this relationship after
the database is successfully updated")
+ .build();
+ public static final Relationship REL_RETRY = new Relationship.Builder()
+ .name("retry")
+ .description("A FlowFile is routed to this relationship if the
database cannot be updated but attempting the operation again may succeed")
+ .build();
+ public static final Relationship REL_FAILURE = new
Relationship.Builder()
+ .name("failure")
+ .description("A FlowFile is routed to this relationship if the
database cannot be updated and retrying the operation will also fail.")
+ .build();
+
+ private final static List<PropertyDescriptor> propertyDescriptors;
+ private final static Set<Relationship> relationships;
+
+ private static final long TICKET_RENEWAL_PERIOD = 60000;
+
+ protected KerberosProperties kerberosProperties;
+
+ protected volatile HiveConfigurator hiveConfigurator = new
HiveConfigurator();
+ protected volatile UserGroupInformation ugi;
+
+ protected final AtomicBoolean isInitialized = new AtomicBoolean(false);
+
+ protected HiveOptions options;
+ protected ExecutorService callTimeoutPool;
+ protected transient Timer heartBeatTimer;
+ protected AtomicBoolean sendHeartBeat = new AtomicBoolean(false);
+ protected Map<HiveEndPoint, HiveWriter> allWriters;
+
+
+ /*
+ * Will ensure that the list of property descriptors is build only
once.
+ * Will also create a Set of relationships
+ */
+ static {
+ propertyDescriptors = new ArrayList<>();
+ propertyDescriptors.add(METASTORE_URI);
+ propertyDescriptors.add(HIVE_CONFIGURATION_RESOURCES);
+ propertyDescriptors.add(DB_NAME);
+ propertyDescriptors.add(TABLE_NAME);
+ propertyDescriptors.add(PARTITION_COLUMNS);
+ propertyDescriptors.add(AUTOCREATE_PARTITIONS);
+ propertyDescriptors.add(MAX_OPEN_CONNECTIONS);
+ propertyDescriptors.add(HEARTBEAT_INTERVAL);
+ propertyDescriptors.add(TXNS_PER_BATCH);
+
+ Set<Relationship> _relationships = new HashSet<>();
+ _relationships.add(REL_SUCCESS);
+ _relationships.add(REL_FAILURE);
+ _relationships.add(REL_RETRY);
+ relationships = Collections.unmodifiableSet(_relationships);
+ }
+
+ @Override
+ protected void init(ProcessorInitializationContext context) {
+ kerberosProperties = getKerberosProperties();
+ propertyDescriptors.add(kerberosProperties.getKerberosPrincipal());
+ propertyDescriptors.add(kerberosProperties.getKerberosKeytab());
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return propertyDescriptors;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+
+ @OnScheduled
+ public void setup(final ProcessContext context) {
+ ComponentLog log = getLogger();
+
+ final String metastoreUri =
context.getProperty(METASTORE_URI).getValue();
+ final String dbName = context.getProperty(DB_NAME).getValue();
+ final String tableName =
context.getProperty(TABLE_NAME).getValue();
+ final boolean autoCreatePartitions =
context.getProperty(AUTOCREATE_PARTITIONS).asBoolean();
+ final Integer maxConnections =
context.getProperty(MAX_OPEN_CONNECTIONS).asInteger();
+ final Integer heartbeatInterval =
context.getProperty(HEARTBEAT_INTERVAL).asInteger();
+ final Integer txnsPerBatch =
context.getProperty(TXNS_PER_BATCH).asInteger();
+ final String configFiles =
context.getProperty(HIVE_CONFIGURATION_RESOURCES).getValue();
+ final Configuration hiveConfig =
hiveConfigurator.getConfigurationFromFiles(configFiles);
+
+ // add any dynamic properties to the Hive configuration
+ for (final Map.Entry<PropertyDescriptor, String> entry :
context.getProperties().entrySet()) {
+ final PropertyDescriptor descriptor = entry.getKey();
+ if (descriptor.isDynamic()) {
+ hiveConfig.set(descriptor.getName(), entry.getValue());
+ }
+ }
+
+ options = new HiveOptions(metastoreUri, dbName, tableName)
+ .withTxnsPerBatch(txnsPerBatch)
+ .withAutoCreatePartitions(autoCreatePartitions)
+ .withMaxOpenConnections(maxConnections)
+ .withHeartBeatInterval(heartbeatInterval);
+
+ if (SecurityUtil.isSecurityEnabled(hiveConfig)) {
+ final String principal =
context.getProperty(kerberosProperties.getKerberosPrincipal()).getValue();
+ final String keyTab =
context.getProperty(kerberosProperties.getKerberosKeytab()).getValue();
+
+ log.info("Hive Security Enabled, logging in as principal {}
with keytab {}", new Object[]{principal, keyTab});
+ try {
+ ugi = hiveConfigurator.authenticate(hiveConfig, principal,
keyTab, TICKET_RENEWAL_PERIOD, log);
+ } catch (AuthenticationFailedException ae) {
+ throw new ProcessException("Kerberos authentication failed
for Hive Streaming", ae);
+ }
+ log.info("Successfully logged in as principal {} with keytab
{}", new Object[]{principal, keyTab});
+ options =
options.withKerberosPrincipal(principal).withKerberosKeytab(keyTab);
+ }
+
+ allWriters = new ConcurrentHashMap<>();
+ String timeoutName = "put-hive-streaming-%d";
+ this.callTimeoutPool = Executors.newFixedThreadPool(1,
+ new
ThreadFactoryBuilder().setNameFormat(timeoutName).build());
+
+ sendHeartBeat.set(true);
+ heartBeatTimer = new Timer();
+ setupHeartBeatTimer();
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final ComponentLog log = getLogger();
+ try {
+ final List<String> partitionColumnList;
+ String partitionColumns =
context.getProperty(PARTITION_COLUMNS).getValue();
+ if (StringUtils.isEmpty(partitionColumns)) {
+ partitionColumnList = Collections.emptyList();
+ } else {
+ String[] partitionCols = partitionColumns.split(",");
+ partitionColumnList = new
ArrayList<>(partitionCols.length);
+ for (String col : partitionCols) {
+ partitionColumnList.add(col.trim());
+ }
+ }
+
+ // Store the original class loader, then explicitly set it to
this class's classloader (for use by the Hive Metastore)
+ ClassLoader originalClassloader =
Thread.currentThread().getContextClassLoader();
+
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+
+ int recordCount = 0;
+ final List<HiveStreamingRecord> records = new LinkedList<>();
+
+ session.read(flowFile, in -> {
+
+ try (final DataFileStream<GenericRecord> reader = new
DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) {
+
+ GenericRecord currRecord;
+ while (reader.hasNext()) {
+ currRecord = reader.next();
--- End diff --
Minor point, I think theres a call to read.next() that takes in a record
instance to reuse, could save some object creating for a large number of records
> Add support for Hive Streaming
> ------------------------------
>
> Key: NIFI-1868
> URL: https://issues.apache.org/jira/browse/NIFI-1868
> Project: Apache NiFi
> Issue Type: New Feature
> Reporter: Matt Burgess
> Assignee: Matt Burgess
> Fix For: 1.0.0
>
>
> Traditionally adding new data into Hive requires gathering a large amount of
> data onto HDFS and then periodically adding a new partition. This is
> essentially a “batch insertion”. Insertion of new data into an existing
> partition is not permitted. Hive Streaming API allows data to be pumped
> continuously into Hive. The incoming data can be continuously committed in
> small batches of records into an existing Hive partition or table. Once data
> is committed it becomes immediately visible to all Hive queries initiated
> subsequently.
> This case is to add a PutHiveStreaming processor to NiFi, to leverage the
> Hive Streaming API to allow continuous streaming of data into a Hive
> partition/table.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)