[
https://issues.apache.org/jira/browse/NIFI-981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260754#comment-15260754
]
ASF GitHub Bot commented on NIFI-981:
-------------------------------------
Github user mattyb149 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/384#discussion_r61317547
--- Diff:
nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/ExecuteHiveQL.java
---
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hive;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.dbcp.hive.HiveDBCPService;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.LongHolder;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.hive.HiveJdbcCommon;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@EventDriven
+@InputRequirement(Requirement.INPUT_ALLOWED)
+@Tags({"hive", "sql", "select", "jdbc", "query", "database"})
+@CapabilityDescription("Execute provided HiveQL SELECT query against a
Hive database connection. Query result will be converted to Avro format."
+ + " Streaming is used so arbitrarily large result sets are
supported. This processor can be scheduled to run on "
+ + "a timer, or cron expression, using the standard scheduling
methods, or it can be triggered by an incoming FlowFile. "
+ + "If it is triggered by an incoming FlowFile, then attributes of
that FlowFile will be available when evaluating the "
+ + "select query. FlowFile attribute 'executehiveql.row.count'
indicates how many rows were selected.")
+public class ExecuteHiveQL extends AbstractHiveQLProcessor {
+
+ public static final String RESULT_ROW_COUNT =
"executehiveql.row.count";
+
+ // Relationships
+ public static final Relationship REL_SUCCESS = new
Relationship.Builder()
+ .name("success")
+ .description("Successfully created FlowFile from HiveQL query
result set.")
+ .build();
+ public static final Relationship REL_FAILURE = new
Relationship.Builder()
+ .name("failure")
+ .description("HiveQL query execution failed. Incoming FlowFile
will be penalized and routed to this relationship")
+ .build();
+
+
+ public static final PropertyDescriptor HIVEQL_SELECT_QUERY = new
PropertyDescriptor.Builder()
+ .name("hive-query")
+ .displayName("HiveQL select query")
+ .description("HiveQL select query")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .build();
+
+ private final static List<PropertyDescriptor> propertyDescriptors;
+ private final static Set<Relationship> relationships;
+
+ /*
+ * Will ensure that the list of property descriptors is build only
once.
+ * Will also create a Set of relationships
+ */
+ static {
+ List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
+ _propertyDescriptors.add(HIVE_DBCP_SERVICE);
+ _propertyDescriptors.add(HIVEQL_SELECT_QUERY);
+ propertyDescriptors =
Collections.unmodifiableList(_propertyDescriptors);
+
+ Set<Relationship> _relationships = new HashSet<>();
+ _relationships.add(REL_SUCCESS);
+ _relationships.add(REL_FAILURE);
+ relationships = Collections.unmodifiableSet(_relationships);
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return propertyDescriptors;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final
ProcessSession session) throws ProcessException {
+ FlowFile fileToProcess = null;
+ if (context.hasIncomingConnection()) {
+ fileToProcess = session.get();
+
+ // If we have no FlowFile, and all incoming connections are
self-loops then we can continue on.
+ // However, if we have no FlowFile and we have connections
coming from other Processors, then
+ // we know that we should run only if we have a FlowFile.
+ if (fileToProcess == null && context.hasNonLoopConnection()) {
+ return;
+ }
+ }
+
+ final ProcessorLog logger = getLogger();
+ final HiveDBCPService dbcpService =
context.getProperty(HIVE_DBCP_SERVICE).asControllerService(HiveDBCPService.class);
+ final String selectQuery =
context.getProperty(HIVEQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
+ final StopWatch stopWatch = new StopWatch(true);
+
+ try (final Connection con = dbcpService.getConnection();
+ final Statement st = con.createStatement()) {
+ final LongHolder nrOfRows = new LongHolder(0L);
+ if (fileToProcess == null) {
+ fileToProcess = session.create();
+ }
+ fileToProcess = session.write(fileToProcess, new
OutputStreamCallback() {
+ @Override
+ public void process(final OutputStream out) throws
IOException {
+ try {
+ logger.debug("Executing query {}", new
Object[]{selectQuery});
+ final ResultSet resultSet =
st.executeQuery(selectQuery);
+
nrOfRows.set(HiveJdbcCommon.convertToAvroStream(resultSet, out));
+ } catch (final SQLException e) {
+ throw new ProcessException(e);
+ }
+ }
+ });
+
+ // set attribute how many rows were selected
+ fileToProcess = session.putAttribute(fileToProcess,
RESULT_ROW_COUNT, nrOfRows.get().toString());
+
+ logger.info("{} contains {} Avro records; transferring to
'success'",
+ new Object[]{fileToProcess, nrOfRows.get()});
+ session.getProvenanceReporter().modifyContent(fileToProcess,
"Retrieved " + nrOfRows.get() + " rows",
--- End diff --
Good point, will make the change
> Add support for Hive JDBC / ExecuteSQL
> --------------------------------------
>
> Key: NIFI-981
> URL: https://issues.apache.org/jira/browse/NIFI-981
> Project: Apache NiFi
> Issue Type: New Feature
> Components: Extensions
> Reporter: Joseph Witt
> Assignee: Matt Burgess
>
> In this mailing list thread from September 2015 "NIFI DBCP connection pool
> not working for hive" the main thrust of the converstation is to provide
> proper support for delivering data to hive. Hive's jdbc driver appears to
> have dependencies on Hadoop libraries. We need to be careful/thoughtful
> about how to best support this so that different versions of Hadoop distros
> can be supported (potentially in parallel on the same flow).
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)