Github user mattyb149 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2695#discussion_r194283856
--- Diff:
nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java
---
@@ -443,31 +481,72 @@ private void onTrigger(final ProcessContext context,
final ProcessSession sessio
throw e;
}
- session.transfer(resultSetFlowFiles, REL_SUCCESS);
+ failure = executeConfigStatements(con, postQueries);
+ if (failure != null) {
+ hqlStatement = failure.getLeft();
+ if (resultSetFlowFiles != null) {
+ resultSetFlowFiles.forEach(ff -> session.remove(ff));
+ }
+ flowfile = (fileToProcess == null) ? session.create() :
fileToProcess;
+ fileToProcess = null;
+ throw failure.getRight();
+ }
+ session.transfer(resultSetFlowFiles, REL_SUCCESS);
+ if (fileToProcess != null) {
+ session.remove(fileToProcess);
+ }
} catch (final ProcessException | SQLException e) {
- logger.error("Issue processing SQL {} due to {}.", new
Object[]{selectQuery, e});
+ logger.error("Issue processing SQL {} due to {}.", new
Object[]{hqlStatement, e});
if (flowfile == null) {
// This can happen if any exceptions occur while setting
up the connection, statement, etc.
logger.error("Unable to execute HiveQL select query {} due
to {}. No FlowFile to route to failure",
- new Object[]{selectQuery, e});
+ new Object[]{hqlStatement, e});
context.yield();
} else {
if (context.hasIncomingConnection()) {
logger.error("Unable to execute HiveQL select query {}
for {} due to {}; routing to failure",
- new Object[]{selectQuery, flowfile, e});
+ new Object[]{hqlStatement, flowfile, e});
flowfile = session.penalize(flowfile);
} else {
logger.error("Unable to execute HiveQL select query {}
due to {}; routing to failure",
- new Object[]{selectQuery, e});
+ new Object[]{hqlStatement, e});
context.yield();
}
session.transfer(flowfile, REL_FAILURE);
}
- } finally {
- if (fileToProcess != null) {
- session.remove(fileToProcess);
+ }
+ }
+
+ /*
+ * Executes given queries using pre-defined connection.
+ * Returns null on success, or a query string if failed.
+ */
+ protected Pair<String,SQLException> executeConfigStatements(final
Connection con, final List<String> configQueries){
+ if (configQueries == null || configQueries.isEmpty()) {
+ return null;
+ }
+
+ for (String confSQL : configQueries) {
+ try(final Statement st = con.createStatement()){
+ st.executeQuery(confSQL);
--- End diff --
Shouldn't this be st.execute()? When I run with a single "set" statement I
get a SQLException that the query did not return a result set...
---