[ 
https://issues.apache.org/jira/browse/NIFI-1663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15404979#comment-15404979
 ] 

ASF GitHub Bot commented on NIFI-1663:
--------------------------------------

Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/727#discussion_r73255072
  
    --- Diff: 
nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
 ---
    @@ -0,0 +1,657 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.hive;
    +
    +import com.google.common.util.concurrent.ThreadFactoryBuilder;
    +import org.apache.avro.Schema;
    +import org.apache.avro.file.DataFileStream;
    +import org.apache.avro.generic.GenericDatumReader;
    +import org.apache.avro.generic.GenericRecord;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.security.UserGroupInformation;
    +import org.apache.hive.hcatalog.streaming.ConnectionError;
    +import org.apache.hive.hcatalog.streaming.HiveEndPoint;
    +import org.apache.hive.hcatalog.streaming.SerializationError;
    +import org.apache.hive.hcatalog.streaming.StreamingException;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.hadoop.KerberosProperties;
    +import org.apache.nifi.hadoop.SecurityUtil;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.util.NiFiProperties;
    +import org.apache.nifi.util.StringUtils;
    +import org.apache.nifi.util.hive.AuthenticationFailedException;
    +import org.apache.nifi.util.hive.HiveConfigurator;
    +import org.apache.nifi.util.hive.HiveOptions;
    +import org.apache.nifi.util.hive.HiveUtils;
    +import org.apache.nifi.util.hive.HiveWriter;
    +import org.json.JSONException;
    +import org.json.JSONObject;
    +
    +import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.Timer;
    +import java.util.TimerTask;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.regex.Pattern;
    +
    +/**
    + * This processor utilizes the Hive Streaming capability to insert data 
from the flow into a Hive database table.
    + */
    +@Tags({"hive", "streaming", "put", "database", "store"})
    +@CapabilityDescription("This processor uses Hive Streaming to send flow 
file data to an Apache Hive table. The incoming flow file is expected to be in "
    +        + "Avro format and the table must exist in Hive. Please see the 
Hive documentation for requirements on the Hive table (format, partitions, 
etc.). "
    +        + "The partition values are extracted from the Avro record based 
on the names of the partition columns as specified in the processor. ")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "hivestreaming.record.count", 
description = "The number of records from this flow file written using Hive 
Streaming.")
    +})
    +public class PutHiveStreaming extends AbstractProcessor {
    +
    +    // Attributes
    +    public static final String HIVE_STREAMING_RECORD_COUNT_ATTR = 
"hivestreaming.record.count";
    +
    +    // Validators
    +    private static final Validator GREATER_THAN_ONE_VALIDATOR = (subject, 
value, context) -> {
    +        if (context.isExpressionLanguageSupported(subject) && 
context.isExpressionLanguagePresent(value)) {
    +            return new 
ValidationResult.Builder().subject(subject).input(value).explanation("Expression
 Language Present").valid(true).build();
    +        }
    +
    +        String reason = null;
    +        try {
    +            final int intVal = Integer.parseInt(value);
    +
    +            if (intVal < 2) {
    +                reason = "value is less than 2";
    +            }
    +        } catch (final NumberFormatException e) {
    +            reason = "value is not a valid integer";
    +        }
    +
    +        return new 
ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason
 == null).build();
    +    };
    +
    +    // Properties
    +    public static final PropertyDescriptor METASTORE_URI = new 
PropertyDescriptor.Builder()
    +            .name("hive-stream-metastore-uri")
    +            .displayName("Hive Metastore URI")
    +            .description("The URI location for the Hive Metastore. Note 
that this is not the location of the Hive Server. The default port for the "
    +                    + "Hive metastore is 9043.")
    +            .required(true)
    +            .addValidator(StandardValidators.URI_VALIDATOR)
    +            
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("(^[^/]+.*[^/]+$|^[^/]+$|^$)")))
 // no start with / or end with /
    +            .build();
    +
    +    public static final PropertyDescriptor HIVE_CONFIGURATION_RESOURCES = 
new PropertyDescriptor.Builder()
    +            .name("hive-config-resources")
    +            .displayName("Hive Configuration Resources")
    +            .description("A file or comma separated list of files which 
contains the Hive configuration (hive-site.xml, e.g.). Without this, Hadoop "
    +                    + "will search the classpath for a 'hive-site.xml' 
file or will revert to a default configuration. Note that to enable 
authentication "
    +                    + "with Kerberos e.g., the appropriate properties must 
be set in the configuration files. Please see the Hive documentation for more 
details.")
    +            .required(false)
    +            .addValidator(HiveUtils.createMultipleFilesExistValidator())
    +            .build();
    +
    +    public static final PropertyDescriptor DB_NAME = new 
PropertyDescriptor.Builder()
    +            .name("hive-stream-database-name")
    +            .displayName("Database Name")
    +            .description("The name of the database in which to put the 
data.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor TABLE_NAME = new 
PropertyDescriptor.Builder()
    +            .name("hive-stream-table-name")
    +            .displayName("Table Name")
    +            .description("The name of the database table in which to put 
the data.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PARTITION_COLUMNS = new 
PropertyDescriptor.Builder()
    +            .name("hive-stream-partition-cols")
    +            .displayName("Partition Columns")
    +            .description("A comma-delimited list of column names on which 
the table has been partitioned. The order of values in this list must "
    +                    + "correspond exactly to the order of partition 
columns specified during the table creation.")
    +            .required(false)
    +            .expressionLanguageSupported(false)
    +            
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("[^,]+(,[^,]+)*")))
 // comma-separated list with non-empty entries
    +            .build();
    +
    +    public static final PropertyDescriptor AUTOCREATE_PARTITIONS = new 
PropertyDescriptor.Builder()
    +            .name("hive-stream-autocreate-partition")
    +            .displayName("Auto-Create Partitions")
    +            .description("Flag indicating whether partitions should be 
automatically created")
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_OPEN_CONNECTIONS = new 
PropertyDescriptor.Builder()
    +            .name("hive-stream-max-open-connections")
    +            .displayName("Max Open Connections")
    +            .description("The maximum number of open connections that can 
be allocated from this pool at the same time, "
    +                    + "or negative for no limit.")
    +            .defaultValue("8")
    +            .required(true)
    +            .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +            .sensitive(false)
    +            .build();
    +
    +    public static final PropertyDescriptor HEARTBEAT_INTERVAL = new 
PropertyDescriptor.Builder()
    +            .name("hive-stream-heartbeat-interval")
    +            .displayName("Heartbeat Interval")
    +            .description("Indicates that a heartbeat should be sent when 
the specified number of seconds has elapsed. "
    +                    + "A value of 0 indicates that no heartbeat should be 
sent.")
    +            .defaultValue("60")
    +            .required(true)
    +            
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .sensitive(false)
    +            .build();
    +
    +    public static final PropertyDescriptor TXNS_PER_BATCH = new 
PropertyDescriptor.Builder()
    +            .name("hive-stream-transactions-per-batch")
    +            .displayName("Transactions per Batch")
    +            .description("A hint to Hive Streaming indicating how many 
transactions the processor task will need. This value must be greater than 1.")
    +            .required(true)
    +            .expressionLanguageSupported(true)
    +            .addValidator(GREATER_THAN_ONE_VALIDATOR)
    +            .defaultValue("100")
    +            .build();
    +
    +    // Relationships
    +    public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
    +            .name("success")
    +            .description("A FlowFile is routed to this relationship after 
the database is successfully updated")
    +            .build();
    +    public static final Relationship REL_RETRY = new Relationship.Builder()
    --- End diff --
    
    Hmm, I wonder if this is really needed especially when REL_FAILURE could be 
looped back to the processor. Am I missing something?


> Add support for ORC format
> --------------------------
>
>                 Key: NIFI-1663
>                 URL: https://issues.apache.org/jira/browse/NIFI-1663
>             Project: Apache NiFi
>          Issue Type: New Feature
>            Reporter: Matt Burgess
>            Assignee: Matt Burgess
>             Fix For: 1.0.0
>
>
> From the Hive/ORC wiki 
> (https://cwiki.apache.org/confluence/display/Hive/LanguageManual+ORC): 
> The Optimized Row Columnar (ORC) file format provides a highly efficient way 
> to store Hive data ... Using ORC files improves performance when Hive is 
> reading, writing, and processing data.
> As users are interested in NiFi integrations with Hive (NIFI-981, NIFI-1193, 
> etc.), NiFi should be able to support ORC file format to enable users to 
> efficiently store flow files for use by Hive.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to