[
https://issues.apache.org/jira/browse/NIFI-1868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15397953#comment-15397953
]
ASF GitHub Bot commented on NIFI-1868:
--------------------------------------
Github user bbende commented on a diff in the pull request:
https://github.com/apache/nifi/pull/706#discussion_r72673853
--- Diff:
nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
---
@@ -0,0 +1,657 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hive;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.hcatalog.streaming.ConnectionError;
+import org.apache.hive.hcatalog.streaming.HiveEndPoint;
+import org.apache.hive.hcatalog.streaming.SerializationError;
+import org.apache.hive.hcatalog.streaming.StreamingException;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.hadoop.KerberosProperties;
+import org.apache.nifi.hadoop.SecurityUtil;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.util.hive.AuthenticationFailedException;
+import org.apache.nifi.util.hive.HiveConfigurator;
+import org.apache.nifi.util.hive.HiveOptions;
+import org.apache.nifi.util.hive.HiveUtils;
+import org.apache.nifi.util.hive.HiveWriter;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Pattern;
+
+/**
+ * This processor utilizes the Hive Streaming capability to insert data
from the flow into a Hive database table.
+ */
+@Tags({"hive", "streaming", "put", "database", "store"})
+@CapabilityDescription("This processor uses Hive Streaming to send flow
file data to an Apache Hive table. The incoming flow file is expected to be in "
+ + "Avro format and the table must exist in Hive. Please see the
Hive documentation for requirements on the Hive table (format, partitions,
etc.). "
+ + "The partition values are extracted from the Avro record based
on the names of the partition columns as specified in the processor. ")
+@WritesAttributes({
+ @WritesAttribute(attribute = "hivestreaming.record.count",
description = "The number of records from this flow file written using Hive
Streaming.")
+})
+public class PutHiveStreaming extends AbstractProcessor {
+
+ // Attributes
+ public static final String HIVE_STREAMING_RECORD_COUNT_ATTR =
"hivestreaming.record.count";
+
+ // Validators
+ private static final Validator GREATER_THAN_ONE_VALIDATOR = (subject,
value, context) -> {
+ if (context.isExpressionLanguageSupported(subject) &&
context.isExpressionLanguagePresent(value)) {
+ return new
ValidationResult.Builder().subject(subject).input(value).explanation("Expression
Language Present").valid(true).build();
+ }
+
+ String reason = null;
+ try {
+ final int intVal = Integer.parseInt(value);
+
+ if (intVal < 2) {
+ reason = "value is less than 2";
+ }
+ } catch (final NumberFormatException e) {
+ reason = "value is not a valid integer";
+ }
+
+ return new
ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason
== null).build();
+ };
+
+ // Properties
+ public static final PropertyDescriptor METASTORE_URI = new
PropertyDescriptor.Builder()
+ .name("hive-stream-metastore-uri")
+ .displayName("Hive Metastore URI")
+ .description("The URI location for the Hive Metastore. Note
that this is not the location of the Hive Server. The default port for the "
+ + "Hive metastore is 9043.")
+ .required(true)
+ .addValidator(StandardValidators.URI_VALIDATOR)
+
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("(^[^/]+.*[^/]+$|^[^/]+$|^$)")))
// no start with / or end with /
+ .build();
+
+ public static final PropertyDescriptor HIVE_CONFIGURATION_RESOURCES =
new PropertyDescriptor.Builder()
+ .name("hive-config-resources")
+ .displayName("Hive Configuration Resources")
+ .description("A file or comma separated list of files which
contains the Hive configuration (hive-site.xml, e.g.). Without this, Hadoop "
+ + "will search the classpath for a 'hive-site.xml'
file or will revert to a default configuration. Note that to enable
authentication "
+ + "with Kerberos e.g., the appropriate properties must
be set in the configuration files. Please see the Hive documentation for more
details.")
+ .required(false)
+ .addValidator(HiveUtils.createMultipleFilesExistValidator())
+ .build();
+
+ public static final PropertyDescriptor DB_NAME = new
PropertyDescriptor.Builder()
+ .name("hive-stream-database-name")
+ .displayName("Database Name")
+ .description("The name of the database in which to put the
data.")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor TABLE_NAME = new
PropertyDescriptor.Builder()
+ .name("hive-stream-table-name")
+ .displayName("Table Name")
+ .description("The name of the database table in which to put
the data.")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor PARTITION_COLUMNS = new
PropertyDescriptor.Builder()
+ .name("hive-stream-partition-cols")
+ .displayName("Partition Columns")
+ .description("A comma-delimited list of column names on which
the table has been partitioned. The order of values in this list must "
+ + "correspond exactly to the order of partition
columns specified during the table creation.")
+ .required(false)
+ .expressionLanguageSupported(false)
+
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("[^,]+(,[^,]+)*")))
// comma-separated list with non-empty entries
+ .build();
+
+ public static final PropertyDescriptor AUTOCREATE_PARTITIONS = new
PropertyDescriptor.Builder()
+ .name("hive-stream-autocreate-partition")
+ .displayName("Auto-Create Partitions")
+ .description("Flag indicating whether partitions should be
automatically created")
+ .required(true)
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .build();
+
+ public static final PropertyDescriptor MAX_OPEN_CONNECTIONS = new
PropertyDescriptor.Builder()
+ .name("hive-stream-max-open-connections")
+ .displayName("Max Open Connections")
+ .description("The maximum number of open connections that can
be allocated from this pool at the same time, "
+ + "or negative for no limit.")
+ .defaultValue("8")
+ .required(true)
+ .addValidator(StandardValidators.INTEGER_VALIDATOR)
+ .sensitive(false)
+ .build();
+
+ public static final PropertyDescriptor HEARTBEAT_INTERVAL = new
PropertyDescriptor.Builder()
+ .name("hive-stream-heartbeat-interval")
+ .displayName("Heartbeat Interval")
+ .description("Indicates that a heartbeat should be sent when
the specified number of seconds has elapsed. "
+ + "A value of 0 indicates that no heartbeat should be
sent.")
+ .defaultValue("60")
+ .required(true)
+
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .sensitive(false)
+ .build();
+
+ public static final PropertyDescriptor TXNS_PER_BATCH = new
PropertyDescriptor.Builder()
+ .name("hive-stream-transactions-per-batch")
+ .displayName("Transactions per Batch")
+ .description("A hint to Hive Streaming indicating how many
transactions the processor task will need. This value must be greater than 1.")
+ .required(true)
+ .expressionLanguageSupported(true)
+ .addValidator(GREATER_THAN_ONE_VALIDATOR)
+ .defaultValue("100")
+ .build();
+
+ // Relationships
+ public static final Relationship REL_SUCCESS = new
Relationship.Builder()
+ .name("success")
+ .description("A FlowFile is routed to this relationship after
the database is successfully updated")
+ .build();
+ public static final Relationship REL_RETRY = new Relationship.Builder()
+ .name("retry")
+ .description("A FlowFile is routed to this relationship if the
database cannot be updated but attempting the operation again may succeed")
+ .build();
+ public static final Relationship REL_FAILURE = new
Relationship.Builder()
+ .name("failure")
+ .description("A FlowFile is routed to this relationship if the
database cannot be updated and retrying the operation will also fail.")
+ .build();
+
+ private final static List<PropertyDescriptor> propertyDescriptors;
+ private final static Set<Relationship> relationships;
+
+ private static final long TICKET_RENEWAL_PERIOD = 60000;
+
+ protected KerberosProperties kerberosProperties;
+
+ protected volatile HiveConfigurator hiveConfigurator = new
HiveConfigurator();
+ protected volatile UserGroupInformation ugi;
+
+ protected final AtomicBoolean isInitialized = new AtomicBoolean(false);
+
+ protected HiveOptions options;
+ protected ExecutorService callTimeoutPool;
+ protected transient Timer heartBeatTimer;
+ protected AtomicBoolean sendHeartBeat = new AtomicBoolean(false);
+ protected Map<HiveEndPoint, HiveWriter> allWriters;
+
+
+ /*
+ * Will ensure that the list of property descriptors is build only
once.
+ * Will also create a Set of relationships
+ */
+ static {
+ propertyDescriptors = new ArrayList<>();
+ propertyDescriptors.add(METASTORE_URI);
+ propertyDescriptors.add(HIVE_CONFIGURATION_RESOURCES);
+ propertyDescriptors.add(DB_NAME);
+ propertyDescriptors.add(TABLE_NAME);
+ propertyDescriptors.add(PARTITION_COLUMNS);
+ propertyDescriptors.add(AUTOCREATE_PARTITIONS);
+ propertyDescriptors.add(MAX_OPEN_CONNECTIONS);
+ propertyDescriptors.add(HEARTBEAT_INTERVAL);
+ propertyDescriptors.add(TXNS_PER_BATCH);
+
+ Set<Relationship> _relationships = new HashSet<>();
+ _relationships.add(REL_SUCCESS);
+ _relationships.add(REL_FAILURE);
+ _relationships.add(REL_RETRY);
+ relationships = Collections.unmodifiableSet(_relationships);
+ }
+
+ @Override
+ protected void init(ProcessorInitializationContext context) {
+ kerberosProperties = getKerberosProperties();
+ propertyDescriptors.add(kerberosProperties.getKerberosPrincipal());
+ propertyDescriptors.add(kerberosProperties.getKerberosKeytab());
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return propertyDescriptors;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+
+ @OnScheduled
+ public void setup(final ProcessContext context) {
+ ComponentLog log = getLogger();
+
+ final String metastoreUri =
context.getProperty(METASTORE_URI).getValue();
+ final String dbName = context.getProperty(DB_NAME).getValue();
+ final String tableName =
context.getProperty(TABLE_NAME).getValue();
+ final boolean autoCreatePartitions =
context.getProperty(AUTOCREATE_PARTITIONS).asBoolean();
+ final Integer maxConnections =
context.getProperty(MAX_OPEN_CONNECTIONS).asInteger();
+ final Integer heartbeatInterval =
context.getProperty(HEARTBEAT_INTERVAL).asInteger();
+ final Integer txnsPerBatch =
context.getProperty(TXNS_PER_BATCH).asInteger();
+ final String configFiles =
context.getProperty(HIVE_CONFIGURATION_RESOURCES).getValue();
+ final Configuration hiveConfig =
hiveConfigurator.getConfigurationFromFiles(configFiles);
+
+ // add any dynamic properties to the Hive configuration
+ for (final Map.Entry<PropertyDescriptor, String> entry :
context.getProperties().entrySet()) {
+ final PropertyDescriptor descriptor = entry.getKey();
+ if (descriptor.isDynamic()) {
+ hiveConfig.set(descriptor.getName(), entry.getValue());
+ }
+ }
+
+ options = new HiveOptions(metastoreUri, dbName, tableName)
+ .withTxnsPerBatch(txnsPerBatch)
+ .withAutoCreatePartitions(autoCreatePartitions)
+ .withMaxOpenConnections(maxConnections)
+ .withHeartBeatInterval(heartbeatInterval);
+
+ if (SecurityUtil.isSecurityEnabled(hiveConfig)) {
+ final String principal =
context.getProperty(kerberosProperties.getKerberosPrincipal()).getValue();
+ final String keyTab =
context.getProperty(kerberosProperties.getKerberosKeytab()).getValue();
+
+ log.info("Hive Security Enabled, logging in as principal {}
with keytab {}", new Object[]{principal, keyTab});
+ try {
+ ugi = hiveConfigurator.authenticate(hiveConfig, principal,
keyTab, TICKET_RENEWAL_PERIOD, log);
+ } catch (AuthenticationFailedException ae) {
+ throw new ProcessException("Kerberos authentication failed
for Hive Streaming", ae);
+ }
+ log.info("Successfully logged in as principal {} with keytab
{}", new Object[]{principal, keyTab});
+ options =
options.withKerberosPrincipal(principal).withKerberosKeytab(keyTab);
+ }
+
+ allWriters = new ConcurrentHashMap<>();
+ String timeoutName = "put-hive-streaming-%d";
+ this.callTimeoutPool = Executors.newFixedThreadPool(1,
+ new
ThreadFactoryBuilder().setNameFormat(timeoutName).build());
+
+ sendHeartBeat.set(true);
+ heartBeatTimer = new Timer();
+ setupHeartBeatTimer();
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final ComponentLog log = getLogger();
+ try {
+ final List<String> partitionColumnList;
+ String partitionColumns =
context.getProperty(PARTITION_COLUMNS).getValue();
+ if (StringUtils.isEmpty(partitionColumns)) {
+ partitionColumnList = Collections.emptyList();
+ } else {
+ String[] partitionCols = partitionColumns.split(",");
+ partitionColumnList = new
ArrayList<>(partitionCols.length);
+ for (String col : partitionCols) {
+ partitionColumnList.add(col.trim());
+ }
+ }
+
+ // Store the original class loader, then explicitly set it to
this class's classloader (for use by the Hive Metastore)
+ ClassLoader originalClassloader =
Thread.currentThread().getContextClassLoader();
+
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+
+ int recordCount = 0;
+ final List<HiveStreamingRecord> records = new LinkedList<>();
+
+ session.read(flowFile, in -> {
+
+ try (final DataFileStream<GenericRecord> reader = new
DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) {
+
+ GenericRecord currRecord;
+ while (reader.hasNext()) {
+ currRecord = reader.next();
+ List<String> partitionValues = new ArrayList<>();
+
+ for (String partition : partitionColumnList) {
+ Object partitionValue =
currRecord.get(partition);
+ if (partitionValue == null) {
+ throw new IOException("Partition column '"
+ partition + "' not found in Avro record");
+ }
+ partitionValues.add(partitionValue.toString());
+ }
+
+ List<Schema.Field> fields =
currRecord.getSchema().getFields();
+ if (fields != null) {
+ JSONObject obj = new JSONObject();
+ for (Schema.Field field : fields) {
+ String fieldName = field.name();
+ // Skip fields that are partition columns,
we extracted those values above to create an EndPoint
+ if
(!partitionColumnList.contains(fieldName)) {
+ Object value =
currRecord.get(fieldName);
+ try {
+ obj.put(fieldName, value);
+ } catch (JSONException je) {
+ throw new IOException(je);
+ }
+ }
+ }
+ records.add(new
HiveStreamingRecord(partitionValues, obj));
+ }
+ }
+ }
+ });
+
+ // Write all records to Hive Streaming
+ for (HiveStreamingRecord record : records) {
--- End diff --
I don't know much about how the HiveWriter works, but would there be any
benefit to writing the records as you go in the above loop, rather than reading
them all into memory and then writing them?
If we leave as is maybe we can just put a note in the processor description
that mentions that the full contents of the incoming Avro will be read into
memory, or if this processor/Hive wouldn't perform well with large Avro files
then maybe we should mention that instead.
> Add support for Hive Streaming
> ------------------------------
>
> Key: NIFI-1868
> URL: https://issues.apache.org/jira/browse/NIFI-1868
> Project: Apache NiFi
> Issue Type: New Feature
> Reporter: Matt Burgess
> Assignee: Matt Burgess
> Fix For: 1.0.0
>
>
> Traditionally adding new data into Hive requires gathering a large amount of
> data onto HDFS and then periodically adding a new partition. This is
> essentially a “batch insertion”. Insertion of new data into an existing
> partition is not permitted. Hive Streaming API allows data to be pumped
> continuously into Hive. The incoming data can be continuously committed in
> small batches of records into an existing Hive partition or table. Once data
> is committed it becomes immediately visible to all Hive queries initiated
> subsequently.
> This case is to add a PutHiveStreaming processor to NiFi, to leverage the
> Hive Streaming API to allow continuous streaming of data into a Hive
> partition/table.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)