[
https://issues.apache.org/jira/browse/NIFI-981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260758#comment-15260758
]
ASF GitHub Bot commented on NIFI-981:
-------------------------------------
Github user bbende commented on a diff in the pull request:
https://github.com/apache/nifi/pull/384#discussion_r61317929
--- Diff:
nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveJdbcCommon.java
---
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.hive;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.SchemaBuilder.FieldAssembler;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+
+import static java.sql.Types.ARRAY;
+import static java.sql.Types.BIGINT;
+import static java.sql.Types.BINARY;
+import static java.sql.Types.BIT;
+import static java.sql.Types.BLOB;
+import static java.sql.Types.BOOLEAN;
+import static java.sql.Types.CHAR;
+import static java.sql.Types.CLOB;
+import static java.sql.Types.DATE;
+import static java.sql.Types.DECIMAL;
+import static java.sql.Types.DOUBLE;
+import static java.sql.Types.FLOAT;
+import static java.sql.Types.INTEGER;
+import static java.sql.Types.LONGNVARCHAR;
+import static java.sql.Types.LONGVARBINARY;
+import static java.sql.Types.LONGVARCHAR;
+import static java.sql.Types.NCHAR;
+import static java.sql.Types.NUMERIC;
+import static java.sql.Types.NVARCHAR;
+import static java.sql.Types.REAL;
+import static java.sql.Types.ROWID;
+import static java.sql.Types.SMALLINT;
+import static java.sql.Types.TIME;
+import static java.sql.Types.TIMESTAMP;
+import static java.sql.Types.TINYINT;
+import static java.sql.Types.VARBINARY;
+import static java.sql.Types.VARCHAR;
+
+/**
+ * JDBC / HiveQL common functions.
+ */
+public class HiveJdbcCommon {
+
+ public static long convertToAvroStream(final ResultSet rs, final
OutputStream outStream) throws SQLException, IOException {
+ return convertToAvroStream(rs, outStream, null, null);
+ }
+
+
+ public static long convertToAvroStream(final ResultSet rs, final
OutputStream outStream, String recordName, ResultSetRowCallback callback)
+ throws SQLException, IOException {
+ final Schema schema = createSchema(rs, recordName);
+ final GenericRecord rec = new GenericData.Record(schema);
+
+ final DatumWriter<GenericRecord> datumWriter = new
GenericDatumWriter<>(schema);
+ try (final DataFileWriter<GenericRecord> dataFileWriter = new
DataFileWriter<>(datumWriter)) {
+ dataFileWriter.create(schema, outStream);
+
+ final ResultSetMetaData meta = rs.getMetaData();
+ final int nrOfColumns = meta.getColumnCount();
+ long nrOfRows = 0;
+ while (rs.next()) {
+ if (callback != null) {
+ callback.processRow(rs);
+ }
+ for (int i = 1; i <= nrOfColumns; i++) {
+ final int javaSqlType = meta.getColumnType(i);
+ final Object value = rs.getObject(i);
+
+ if (value == null) {
+ rec.put(i - 1, null);
+
+ } else if (javaSqlType == BINARY || javaSqlType ==
VARBINARY || javaSqlType == LONGVARBINARY || javaSqlType == ARRAY ||
javaSqlType == BLOB || javaSqlType == CLOB) {
+ // bytes requires little bit different handling
+ byte[] bytes = rs.getBytes(i);
+ ByteBuffer bb = ByteBuffer.wrap(bytes);
+ rec.put(i - 1, bb);
+
+ } else if (value instanceof Byte) {
+ // tinyint(1) type is returned by JDBC driver as
java.sql.Types.TINYINT
+ // But value is returned by JDBC as java.lang.Byte
+ // (at least H2 JDBC works this way)
+ // direct put to avro record results:
+ // org.apache.avro.AvroRuntimeException: Unknown
datum type java.lang.Byte
+ rec.put(i - 1, ((Byte) value).intValue());
+
+ } else if (value instanceof BigDecimal || value
instanceof BigInteger) {
+ // Avro can't handle BigDecimal and BigInteger as
numbers - it will throw an AvroRuntimeException such as: "Unknown datum type:
java.math.BigDecimal: 38"
+ rec.put(i - 1, value.toString());
+
+ } else if (value instanceof Number || value instanceof
Boolean) {
+ rec.put(i - 1, value);
+
+ } else {
+ // The different types that we support are numbers
(int, long, double, float),
+ // as well as boolean values and Strings. Since
Avro doesn't provide
+ // timestamp types, we want to convert those to
Strings. So we will cast anything other
+ // than numbers or booleans to strings by using
the toString() method.
+ rec.put(i - 1, value.toString());
+ }
+ }
+ dataFileWriter.append(rec);
+ nrOfRows += 1;
+ }
+
+ return nrOfRows;
+ }
+ }
+
+ public static Schema createSchema(final ResultSet rs) throws
SQLException {
+ return createSchema(rs, null);
+ }
+
+ /**
+ * Creates an Avro schema from a result set. If the table/record name
is known a priori and provided, use that as a
+ * fallback for the record name if it cannot be retrieved from the
result set, and finally fall back to a default value.
+ *
+ * @param rs The result set to convert to Avro
+ * @param recordName The a priori record name to use if it cannot be
determined from the result set.
+ * @return A Schema object representing the result set converted to an
Avro record
+ * @throws SQLException if any error occurs during conversion
+ */
+ public static Schema createSchema(final ResultSet rs, String
recordName) throws SQLException {
+ final ResultSetMetaData meta = rs.getMetaData();
+ final int nrOfColumns = meta.getColumnCount();
+ String tableName = StringUtils.isEmpty(recordName) ?
"NiFi_ExecuteSQL_Record" : recordName;
--- End diff --
Probably want this to be NiFi_HiveQL_Record, or something like that
> Add support for Hive JDBC / ExecuteSQL
> --------------------------------------
>
> Key: NIFI-981
> URL: https://issues.apache.org/jira/browse/NIFI-981
> Project: Apache NiFi
> Issue Type: New Feature
> Components: Extensions
> Reporter: Joseph Witt
> Assignee: Matt Burgess
>
> In this mailing list thread from September 2015 "NIFI DBCP connection pool
> not working for hive" the main thrust of the converstation is to provide
> proper support for delivering data to hive. Hive's jdbc driver appears to
> have dependencies on Hadoop libraries. We need to be careful/thoughtful
> about how to best support this so that different versions of Hadoop distros
> can be supported (potentially in parallel on the same flow).
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)