[ 
https://issues.apache.org/jira/browse/NIFI-981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260758#comment-15260758
 ] 

ASF GitHub Bot commented on NIFI-981:
-------------------------------------

Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/384#discussion_r61317929
  
    --- Diff: 
nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveJdbcCommon.java
 ---
    @@ -0,0 +1,272 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.util.hive;
    +
    +import org.apache.avro.Schema;
    +import org.apache.avro.SchemaBuilder;
    +import org.apache.avro.SchemaBuilder.FieldAssembler;
    +import org.apache.avro.file.DataFileWriter;
    +import org.apache.avro.generic.GenericData;
    +import org.apache.avro.generic.GenericDatumWriter;
    +import org.apache.avro.generic.GenericRecord;
    +import org.apache.avro.io.DatumWriter;
    +import org.apache.commons.lang3.StringUtils;
    +
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.math.BigDecimal;
    +import java.math.BigInteger;
    +import java.nio.ByteBuffer;
    +import java.sql.ResultSet;
    +import java.sql.ResultSetMetaData;
    +import java.sql.SQLException;
    +
    +import static java.sql.Types.ARRAY;
    +import static java.sql.Types.BIGINT;
    +import static java.sql.Types.BINARY;
    +import static java.sql.Types.BIT;
    +import static java.sql.Types.BLOB;
    +import static java.sql.Types.BOOLEAN;
    +import static java.sql.Types.CHAR;
    +import static java.sql.Types.CLOB;
    +import static java.sql.Types.DATE;
    +import static java.sql.Types.DECIMAL;
    +import static java.sql.Types.DOUBLE;
    +import static java.sql.Types.FLOAT;
    +import static java.sql.Types.INTEGER;
    +import static java.sql.Types.LONGNVARCHAR;
    +import static java.sql.Types.LONGVARBINARY;
    +import static java.sql.Types.LONGVARCHAR;
    +import static java.sql.Types.NCHAR;
    +import static java.sql.Types.NUMERIC;
    +import static java.sql.Types.NVARCHAR;
    +import static java.sql.Types.REAL;
    +import static java.sql.Types.ROWID;
    +import static java.sql.Types.SMALLINT;
    +import static java.sql.Types.TIME;
    +import static java.sql.Types.TIMESTAMP;
    +import static java.sql.Types.TINYINT;
    +import static java.sql.Types.VARBINARY;
    +import static java.sql.Types.VARCHAR;
    +
    +/**
    + * JDBC / HiveQL common functions.
    + */
    +public class HiveJdbcCommon {
    +
    +    public static long convertToAvroStream(final ResultSet rs, final 
OutputStream outStream) throws SQLException, IOException {
    +        return convertToAvroStream(rs, outStream, null, null);
    +    }
    +
    +
    +    public static long convertToAvroStream(final ResultSet rs, final 
OutputStream outStream, String recordName, ResultSetRowCallback callback)
    +            throws SQLException, IOException {
    +        final Schema schema = createSchema(rs, recordName);
    +        final GenericRecord rec = new GenericData.Record(schema);
    +
    +        final DatumWriter<GenericRecord> datumWriter = new 
GenericDatumWriter<>(schema);
    +        try (final DataFileWriter<GenericRecord> dataFileWriter = new 
DataFileWriter<>(datumWriter)) {
    +            dataFileWriter.create(schema, outStream);
    +
    +            final ResultSetMetaData meta = rs.getMetaData();
    +            final int nrOfColumns = meta.getColumnCount();
    +            long nrOfRows = 0;
    +            while (rs.next()) {
    +                if (callback != null) {
    +                    callback.processRow(rs);
    +                }
    +                for (int i = 1; i <= nrOfColumns; i++) {
    +                    final int javaSqlType = meta.getColumnType(i);
    +                    final Object value = rs.getObject(i);
    +
    +                    if (value == null) {
    +                        rec.put(i - 1, null);
    +
    +                    } else if (javaSqlType == BINARY || javaSqlType == 
VARBINARY || javaSqlType == LONGVARBINARY || javaSqlType == ARRAY || 
javaSqlType == BLOB || javaSqlType == CLOB) {
    +                        // bytes requires little bit different handling
    +                        byte[] bytes = rs.getBytes(i);
    +                        ByteBuffer bb = ByteBuffer.wrap(bytes);
    +                        rec.put(i - 1, bb);
    +
    +                    } else if (value instanceof Byte) {
    +                        // tinyint(1) type is returned by JDBC driver as 
java.sql.Types.TINYINT
    +                        // But value is returned by JDBC as java.lang.Byte
    +                        // (at least H2 JDBC works this way)
    +                        // direct put to avro record results:
    +                        // org.apache.avro.AvroRuntimeException: Unknown 
datum type java.lang.Byte
    +                        rec.put(i - 1, ((Byte) value).intValue());
    +
    +                    } else if (value instanceof BigDecimal || value 
instanceof BigInteger) {
    +                        // Avro can't handle BigDecimal and BigInteger as 
numbers - it will throw an AvroRuntimeException such as: "Unknown datum type: 
java.math.BigDecimal: 38"
    +                        rec.put(i - 1, value.toString());
    +
    +                    } else if (value instanceof Number || value instanceof 
Boolean) {
    +                        rec.put(i - 1, value);
    +
    +                    } else {
    +                        // The different types that we support are numbers 
(int, long, double, float),
    +                        // as well as boolean values and Strings. Since 
Avro doesn't provide
    +                        // timestamp types, we want to convert those to 
Strings. So we will cast anything other
    +                        // than numbers or booleans to strings by using 
the toString() method.
    +                        rec.put(i - 1, value.toString());
    +                    }
    +                }
    +                dataFileWriter.append(rec);
    +                nrOfRows += 1;
    +            }
    +
    +            return nrOfRows;
    +        }
    +    }
    +
    +    public static Schema createSchema(final ResultSet rs) throws 
SQLException {
    +        return createSchema(rs, null);
    +    }
    +
    +    /**
    +     * Creates an Avro schema from a result set. If the table/record name 
is known a priori and provided, use that as a
    +     * fallback for the record name if it cannot be retrieved from the 
result set, and finally fall back to a default value.
    +     *
    +     * @param rs         The result set to convert to Avro
    +     * @param recordName The a priori record name to use if it cannot be 
determined from the result set.
    +     * @return A Schema object representing the result set converted to an 
Avro record
    +     * @throws SQLException if any error occurs during conversion
    +     */
    +    public static Schema createSchema(final ResultSet rs, String 
recordName) throws SQLException {
    +        final ResultSetMetaData meta = rs.getMetaData();
    +        final int nrOfColumns = meta.getColumnCount();
    +        String tableName = StringUtils.isEmpty(recordName) ? 
"NiFi_ExecuteSQL_Record" : recordName;
    --- End diff --
    
    Probably want this to be NiFi_HiveQL_Record, or something like that


> Add support for Hive JDBC / ExecuteSQL
> --------------------------------------
>
>                 Key: NIFI-981
>                 URL: https://issues.apache.org/jira/browse/NIFI-981
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Extensions
>            Reporter: Joseph Witt
>            Assignee: Matt Burgess
>
> In this mailing list thread from September 2015 "NIFI DBCP connection pool 
> not working for hive" the main thrust of the converstation is to provide 
> proper support for delivering data to hive.  Hive's jdbc driver appears to 
> have dependencies on Hadoop libraries.  We need to be careful/thoughtful 
> about how to best support this so that different versions of Hadoop distros 
> can be supported (potentially in parallel on the same flow).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to