[ 
https://issues.apache.org/jira/browse/NIFI-981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15250522#comment-15250522
 ] 

ASF GitHub Bot commented on NIFI-981:
-------------------------------------

Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/372#discussion_r60468517
  
    --- Diff: 
nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java
 ---
    @@ -0,0 +1,320 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.hive;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.ReadsAttribute;
    +import org.apache.nifi.annotation.behavior.ReadsAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.dbcp.hive.HiveDBCPService;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.stream.io.StreamUtils;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.math.BigDecimal;
    +import java.nio.charset.Charset;
    +import java.sql.Connection;
    +import java.sql.Date;
    +import java.sql.PreparedStatement;
    +import java.sql.SQLException;
    +import java.sql.SQLNonTransientException;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.sql.Types;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +
    +@SeeAlso(ExecuteHiveQL.class)
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"sql", "hive", "put", "database", "update", "insert"})
    +@CapabilityDescription("Executes a HiveQL DDL/DML command (UPDATE, INSERT, 
e.g.). The content of an incoming FlowFile is expected to be the HiveQL command 
"
    +        + "to execute. The HiveQL command may use the ? to escape 
parameters. In this case, the parameters to use must exist as FlowFile 
attributes "
    +        + "with the naming convention hiveql.args.N.type and 
hiveql.args.N.value, where N is a positive integer. The hiveql.args.N.type is 
expected to be "
    +        + "a number indicating the JDBC Type. The content of the FlowFile 
is expected to be in UTF-8 format.")
    +@ReadsAttributes({
    +        @ReadsAttribute(attribute = "hiveql.args.N.type", description = 
"Incoming FlowFiles are expected to be parameterized HiveQL statements. The 
type of each Parameter is specified as an integer "
    +                + "that represents the JDBC Type of the parameter."),
    +        @ReadsAttribute(attribute = "hiveql.args.N.value", description = 
"Incoming FlowFiles are expected to be parameterized HiveQL statements. The 
value of the Parameters are specified as "
    +                + "hiveql.args.1.value, hiveql.args.2.value, 
hiveql.args.3.value, and so on. The type of the hiveql.args.1.value Parameter 
is specified by the hiveql.args.1.type attribute.")
    +})
    +public class PutHiveQL extends AbstractHiveQLProcessor {
    +
    +    public static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
    +            .name("Batch Size")
    +            .description("The preferred number of FlowFiles to put to the 
database in a single transaction")
    +            .required(true)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("100")
    +            .build();
    +
    +    public static final PropertyDescriptor CHARSET = new 
PropertyDescriptor.Builder()
    +            .name("Character Set")
    +            .description("Specifies the character set of the record data.")
    +            .required(true)
    +            .defaultValue("UTF-8")
    +            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
    +            .build();
    +
    +    public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
    +            .name("success")
    +            .description("A FlowFile is routed to this relationship after 
the database is successfully updated")
    +            .build();
    +    public static final Relationship REL_RETRY = new Relationship.Builder()
    +            .name("retry")
    +            .description("A FlowFile is routed to this relationship if the 
database cannot be updated but attempting the operation again may succeed")
    +            .build();
    +    public static final Relationship REL_FAILURE = new 
Relationship.Builder()
    +            .name("failure")
    +            .description("A FlowFile is routed to this relationship if the 
database cannot be updated and retrying the operation will also fail, "
    +                    + "such as an invalid query or an integrity constraint 
violation")
    +            .build();
    +
    +    private static final Pattern HIVEQL_TYPE_ATTRIBUTE_PATTERN = 
Pattern.compile("hiveql\\.args\\.(\\d+)\\.type");
    +    private static final Pattern NUMBER_PATTERN = 
Pattern.compile("-?\\d+");
    +
    +    private final static List<PropertyDescriptor> propertyDescriptors;
    +    private final static Set<Relationship> relationships;
    +
    +    /*
    +     * Will ensure that the list of property descriptors is build only 
once.
    +     * Will also create a Set of relationships
    +     */
    +    static {
    +        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
    +        _propertyDescriptors.add(HIVE_DBCP_SERVICE);
    +        _propertyDescriptors.add(BATCH_SIZE);
    +        _propertyDescriptors.add(CHARSET);
    +        propertyDescriptors = 
Collections.unmodifiableList(_propertyDescriptors);
    +
    +        Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.add(REL_SUCCESS);
    +        _relationships.add(REL_FAILURE);
    +        _relationships.add(REL_RETRY);
    +        relationships = Collections.unmodifiableSet(_relationships);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
    +        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
    +        final List<FlowFile> flowFiles = session.get(batchSize);
    +
    +        if (flowFiles.isEmpty()) {
    +            return;
    +        }
    +
    +        final long startNanos = System.nanoTime();
    +        final Charset charset = 
Charset.forName(context.getProperty(CHARSET).getValue());
    +        final HiveDBCPService dbcpService = 
context.getProperty(HIVE_DBCP_SERVICE).asControllerService(HiveDBCPService.class);
    +        try (final Connection conn = dbcpService.getConnection()) {
    +
    +            for (FlowFile flowFile : flowFiles) {
    +                try {
    +                    final String hiveQL = getHiveQL(session, flowFile, 
charset);
    +                    final PreparedStatement stmt = 
conn.prepareStatement(hiveQL);
    +                    setParameters(stmt, flowFile.getAttributes());
    +
    +                    // Execute the statement
    +                    stmt.execute();
    +
    +                    // Determine the database URL from the connection 
metadata
    +                    String url = "jdbc:hive2://unknown-host";
    +                    try {
    +                        url = conn.getMetaData().getURL();
    +                    } catch (final SQLException sqle) {
    +                        // Just use the default
    +                    }
    +
    +                    // Emit a Provenance SEND event
    +                    final long transmissionMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
    +                    session.getProvenanceReporter().send(flowFile, url, 
transmissionMillis, true);
    +                    session.transfer(flowFile, REL_SUCCESS);
    +
    +                } catch (final SQLException e) {
    +
    +                    if (e instanceof SQLNonTransientException) {
    +                        getLogger().error("Failed to update Hive for {} 
due to {}; routing to failure", new Object[]{flowFile, e});
    +                        session.transfer(flowFile, REL_FAILURE);
    +                    } else {
    +                        getLogger().error("Failed to update Hive for {} 
due to {}; it is possible that retrying the operation will succeed, so routing 
to retry", new Object[]{flowFile, e});
    +                        flowFile = session.penalize(flowFile);
    +                        session.transfer(flowFile, REL_RETRY);
    +                    }
    +
    +                }
    +            }
    +        } catch (final SQLException sqle) {
    +            // There was a problem getting the connection, yield and retry 
the flowfiles
    +            getLogger().error("Failed to get Hive connection due to {}; it 
is possible that retrying the operation will succeed, so routing to retry", new 
Object[]{sqle});
    +            session.transfer(flowFiles, REL_RETRY);
    +            context.yield();
    +        }
    +    }
    +
    +    /**
    +     * Determines the HiveQL statement that should be executed for the 
given FlowFile
    +     *
    +     * @param session  the session that can be used to access the given 
FlowFile
    +     * @param flowFile the FlowFile whose HiveQL statement should be 
executed
    +     * @return the HiveQL that is associated with the given FlowFile
    +     */
    +    private String getHiveQL(final ProcessSession session, final FlowFile 
flowFile, final Charset charset) {
    +        // Read the HiveQL from the FlowFile's content
    +        final byte[] buffer = new byte[(int) flowFile.getSize()];
    +        session.read(flowFile, new InputStreamCallback() {
    +            @Override
    +            public void process(final InputStream in) throws IOException {
    +                StreamUtils.fillBuffer(in, buffer);
    +            }
    +        });
    +
    +        // Create the PreparedStatement to use for this FlowFile.
    +        return new String(buffer, charset);
    +    }
    +
    +
    +    /**
    +     * Sets all of the appropriate parameters on the given 
PreparedStatement, based on the given FlowFile attributes.
    +     *
    +     * @param stmt       the statement to set the parameters on
    +     * @param attributes the attributes from which to derive parameter 
indices, values, and types
    +     * @throws SQLException if the PreparedStatement throws a SQLException 
when the appropriate setter is called
    +     */
    +    private void setParameters(final PreparedStatement stmt, final 
Map<String, String> attributes) throws SQLException {
    +        for (final Map.Entry<String, String> entry : 
attributes.entrySet()) {
    +            final String key = entry.getKey();
    +            final Matcher matcher = 
HIVEQL_TYPE_ATTRIBUTE_PATTERN.matcher(key);
    +            if (matcher.matches()) {
    +                final int parameterIndex = 
Integer.parseInt(matcher.group(1));
    +
    +                final boolean isNumeric = 
NUMBER_PATTERN.matcher(entry.getValue()).matches();
    +                if (!isNumeric) {
    +                    throw new ProcessException("Value of the " + key + " 
attribute is '" + entry.getValue() + "', which is not a valid JDBC numeral 
type");
    +                }
    +
    +                final int jdbcType = Integer.parseInt(entry.getValue());
    +                final String valueAttrName = "hiveql.args." + 
parameterIndex + ".value";
    +                final String parameterValue = 
attributes.get(valueAttrName);
    +
    +                try {
    +                    setParameter(stmt, valueAttrName, parameterIndex, 
parameterValue, jdbcType);
    +                } catch (final NumberFormatException nfe) {
    +                    throw new ProcessException("The value of the " + 
valueAttrName + " is '" + parameterValue + "', which cannot be converted into 
the necessary data type", nfe);
    +                }
    +            }
    +        }
    +    }
    +
    +    /**
    +     * Determines how to map the given value to the appropriate JDBC data 
type and sets the parameter on the
    +     * provided PreparedStatement
    +     *
    +     * @param stmt           the PreparedStatement to set the parameter on
    +     * @param attrName       the name of the attribute that the parameter 
is coming from - for logging purposes
    +     * @param parameterIndex the index of the HiveQL parameter to set
    +     * @param parameterValue the value of the HiveQL parameter to set
    +     * @param jdbcType       the JDBC Type of the HiveQL parameter to set
    +     * @throws SQLException if the PreparedStatement throws a SQLException 
when calling the appropriate setter
    +     */
    +    private void setParameter(final PreparedStatement stmt, final String 
attrName, final int parameterIndex, final String parameterValue, final int 
jdbcType) throws SQLException {
    +        if (parameterValue == null) {
    +            stmt.setNull(parameterIndex, jdbcType);
    +        } else {
    +            try {
    +                switch (jdbcType) {
    +                    case Types.BIT:
    +                    case Types.BOOLEAN:
    +                        stmt.setBoolean(parameterIndex, 
Boolean.parseBoolean(parameterValue));
    +                        break;
    +                    case Types.TINYINT:
    +                        stmt.setByte(parameterIndex, 
Byte.parseByte(parameterValue));
    +                        break;
    +                    case Types.SMALLINT:
    +                        stmt.setShort(parameterIndex, 
Short.parseShort(parameterValue));
    +                        break;
    +                    case Types.INTEGER:
    +                        stmt.setInt(parameterIndex, 
Integer.parseInt(parameterValue));
    +                        break;
    +                    case Types.BIGINT:
    +                        stmt.setLong(parameterIndex, 
Long.parseLong(parameterValue));
    +                        break;
    +                    case Types.REAL:
    +                        stmt.setFloat(parameterIndex, 
Float.parseFloat(parameterValue));
    +                        break;
    +                    case Types.FLOAT:
    +                    case Types.DOUBLE:
    +                        stmt.setDouble(parameterIndex, 
Double.parseDouble(parameterValue));
    +                        break;
    +                    case Types.DECIMAL:
    +                    case Types.NUMERIC:
    +                        stmt.setBigDecimal(parameterIndex, new 
BigDecimal(parameterValue));
    +                        break;
    +                    case Types.DATE:
    +                        stmt.setDate(parameterIndex, new 
Date(Long.parseLong(parameterValue)));
    +                        break;
    +                    case Types.TIME:
    +                        stmt.setTime(parameterIndex, new 
Time(Long.parseLong(parameterValue)));
    +                        break;
    +                    case Types.TIMESTAMP:
    +                        stmt.setTimestamp(parameterIndex, new 
Timestamp(Long.parseLong(parameterValue)));
    +                        break;
    +                    case Types.CHAR:
    +                    case Types.VARCHAR:
    +                    case Types.LONGNVARCHAR:
    +                    case Types.LONGVARCHAR:
    +                        stmt.setString(parameterIndex, parameterValue);
    +                        break;
    +                    default:
    +                        stmt.setObject(parameterIndex, parameterValue, 
jdbcType);
    +                        break;
    +                }
    --- End diff --
    
    As much as I don't like the above, we know now that setObject does not 
always do what it was intended to do so we stuck with it. . . Argh. . . just 
venting ;)


> Add support for Hive JDBC / ExecuteSQL
> --------------------------------------
>
>                 Key: NIFI-981
>                 URL: https://issues.apache.org/jira/browse/NIFI-981
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Extensions
>            Reporter: Joseph Witt
>            Assignee: Matt Burgess
>
> In this mailing list thread from September 2015 "NIFI DBCP connection pool 
> not working for hive" the main thrust of the converstation is to provide 
> proper support for delivering data to hive.  Hive's jdbc driver appears to 
> have dependencies on Hadoop libraries.  We need to be careful/thoughtful 
> about how to best support this so that different versions of Hadoop distros 
> can be supported (potentially in parallel on the same flow).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to