Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/384#discussion_r61317929
  
    --- Diff: 
nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveJdbcCommon.java
 ---
    @@ -0,0 +1,272 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.util.hive;
    +
    +import org.apache.avro.Schema;
    +import org.apache.avro.SchemaBuilder;
    +import org.apache.avro.SchemaBuilder.FieldAssembler;
    +import org.apache.avro.file.DataFileWriter;
    +import org.apache.avro.generic.GenericData;
    +import org.apache.avro.generic.GenericDatumWriter;
    +import org.apache.avro.generic.GenericRecord;
    +import org.apache.avro.io.DatumWriter;
    +import org.apache.commons.lang3.StringUtils;
    +
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.math.BigDecimal;
    +import java.math.BigInteger;
    +import java.nio.ByteBuffer;
    +import java.sql.ResultSet;
    +import java.sql.ResultSetMetaData;
    +import java.sql.SQLException;
    +
    +import static java.sql.Types.ARRAY;
    +import static java.sql.Types.BIGINT;
    +import static java.sql.Types.BINARY;
    +import static java.sql.Types.BIT;
    +import static java.sql.Types.BLOB;
    +import static java.sql.Types.BOOLEAN;
    +import static java.sql.Types.CHAR;
    +import static java.sql.Types.CLOB;
    +import static java.sql.Types.DATE;
    +import static java.sql.Types.DECIMAL;
    +import static java.sql.Types.DOUBLE;
    +import static java.sql.Types.FLOAT;
    +import static java.sql.Types.INTEGER;
    +import static java.sql.Types.LONGNVARCHAR;
    +import static java.sql.Types.LONGVARBINARY;
    +import static java.sql.Types.LONGVARCHAR;
    +import static java.sql.Types.NCHAR;
    +import static java.sql.Types.NUMERIC;
    +import static java.sql.Types.NVARCHAR;
    +import static java.sql.Types.REAL;
    +import static java.sql.Types.ROWID;
    +import static java.sql.Types.SMALLINT;
    +import static java.sql.Types.TIME;
    +import static java.sql.Types.TIMESTAMP;
    +import static java.sql.Types.TINYINT;
    +import static java.sql.Types.VARBINARY;
    +import static java.sql.Types.VARCHAR;
    +
    +/**
    + * JDBC / HiveQL common functions.
    + */
    +public class HiveJdbcCommon {
    +
    +    public static long convertToAvroStream(final ResultSet rs, final 
OutputStream outStream) throws SQLException, IOException {
    +        return convertToAvroStream(rs, outStream, null, null);
    +    }
    +
    +
    +    public static long convertToAvroStream(final ResultSet rs, final 
OutputStream outStream, String recordName, ResultSetRowCallback callback)
    +            throws SQLException, IOException {
    +        final Schema schema = createSchema(rs, recordName);
    +        final GenericRecord rec = new GenericData.Record(schema);
    +
    +        final DatumWriter<GenericRecord> datumWriter = new 
GenericDatumWriter<>(schema);
    +        try (final DataFileWriter<GenericRecord> dataFileWriter = new 
DataFileWriter<>(datumWriter)) {
    +            dataFileWriter.create(schema, outStream);
    +
    +            final ResultSetMetaData meta = rs.getMetaData();
    +            final int nrOfColumns = meta.getColumnCount();
    +            long nrOfRows = 0;
    +            while (rs.next()) {
    +                if (callback != null) {
    +                    callback.processRow(rs);
    +                }
    +                for (int i = 1; i <= nrOfColumns; i++) {
    +                    final int javaSqlType = meta.getColumnType(i);
    +                    final Object value = rs.getObject(i);
    +
    +                    if (value == null) {
    +                        rec.put(i - 1, null);
    +
    +                    } else if (javaSqlType == BINARY || javaSqlType == 
VARBINARY || javaSqlType == LONGVARBINARY || javaSqlType == ARRAY || 
javaSqlType == BLOB || javaSqlType == CLOB) {
    +                        // bytes requires little bit different handling
    +                        byte[] bytes = rs.getBytes(i);
    +                        ByteBuffer bb = ByteBuffer.wrap(bytes);
    +                        rec.put(i - 1, bb);
    +
    +                    } else if (value instanceof Byte) {
    +                        // tinyint(1) type is returned by JDBC driver as 
java.sql.Types.TINYINT
    +                        // But value is returned by JDBC as java.lang.Byte
    +                        // (at least H2 JDBC works this way)
    +                        // direct put to avro record results:
    +                        // org.apache.avro.AvroRuntimeException: Unknown 
datum type java.lang.Byte
    +                        rec.put(i - 1, ((Byte) value).intValue());
    +
    +                    } else if (value instanceof BigDecimal || value 
instanceof BigInteger) {
    +                        // Avro can't handle BigDecimal and BigInteger as 
numbers - it will throw an AvroRuntimeException such as: "Unknown datum type: 
java.math.BigDecimal: 38"
    +                        rec.put(i - 1, value.toString());
    +
    +                    } else if (value instanceof Number || value instanceof 
Boolean) {
    +                        rec.put(i - 1, value);
    +
    +                    } else {
    +                        // The different types that we support are numbers 
(int, long, double, float),
    +                        // as well as boolean values and Strings. Since 
Avro doesn't provide
    +                        // timestamp types, we want to convert those to 
Strings. So we will cast anything other
    +                        // than numbers or booleans to strings by using 
the toString() method.
    +                        rec.put(i - 1, value.toString());
    +                    }
    +                }
    +                dataFileWriter.append(rec);
    +                nrOfRows += 1;
    +            }
    +
    +            return nrOfRows;
    +        }
    +    }
    +
    +    public static Schema createSchema(final ResultSet rs) throws 
SQLException {
    +        return createSchema(rs, null);
    +    }
    +
    +    /**
    +     * Creates an Avro schema from a result set. If the table/record name 
is known a priori and provided, use that as a
    +     * fallback for the record name if it cannot be retrieved from the 
result set, and finally fall back to a default value.
    +     *
    +     * @param rs         The result set to convert to Avro
    +     * @param recordName The a priori record name to use if it cannot be 
determined from the result set.
    +     * @return A Schema object representing the result set converted to an 
Avro record
    +     * @throws SQLException if any error occurs during conversion
    +     */
    +    public static Schema createSchema(final ResultSet rs, String 
recordName) throws SQLException {
    +        final ResultSetMetaData meta = rs.getMetaData();
    +        final int nrOfColumns = meta.getColumnCount();
    +        String tableName = StringUtils.isEmpty(recordName) ? 
"NiFi_ExecuteSQL_Record" : recordName;
    --- End diff --
    
    Probably want this to be NiFi_HiveQL_Record, or something like that


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to