Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2695#discussion_r194283856
  
    --- Diff: 
nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java
 ---
    @@ -443,31 +481,72 @@ private void onTrigger(final ProcessContext context, 
final ProcessSession sessio
                     throw e;
                 }
     
    -            session.transfer(resultSetFlowFiles, REL_SUCCESS);
    +            failure = executeConfigStatements(con, postQueries);
    +            if (failure != null) {
    +                hqlStatement = failure.getLeft();
    +                if (resultSetFlowFiles != null) {
    +                    resultSetFlowFiles.forEach(ff -> session.remove(ff));
    +                }
    +                flowfile = (fileToProcess == null) ? session.create() : 
fileToProcess;
    +                fileToProcess = null;
    +                throw failure.getRight();
    +            }
     
    +            session.transfer(resultSetFlowFiles, REL_SUCCESS);
    +            if (fileToProcess != null) {
    +                session.remove(fileToProcess);
    +            }
             } catch (final ProcessException | SQLException e) {
    -            logger.error("Issue processing SQL {} due to {}.", new 
Object[]{selectQuery, e});
    +            logger.error("Issue processing SQL {} due to {}.", new 
Object[]{hqlStatement, e});
                 if (flowfile == null) {
                     // This can happen if any exceptions occur while setting 
up the connection, statement, etc.
                     logger.error("Unable to execute HiveQL select query {} due 
to {}. No FlowFile to route to failure",
    -                        new Object[]{selectQuery, e});
    +                        new Object[]{hqlStatement, e});
                     context.yield();
                 } else {
                     if (context.hasIncomingConnection()) {
                         logger.error("Unable to execute HiveQL select query {} 
for {} due to {}; routing to failure",
    -                            new Object[]{selectQuery, flowfile, e});
    +                            new Object[]{hqlStatement, flowfile, e});
                         flowfile = session.penalize(flowfile);
                     } else {
                         logger.error("Unable to execute HiveQL select query {} 
due to {}; routing to failure",
    -                            new Object[]{selectQuery, e});
    +                            new Object[]{hqlStatement, e});
                         context.yield();
                     }
                     session.transfer(flowfile, REL_FAILURE);
                 }
    -        } finally {
    -            if (fileToProcess != null) {
    -                session.remove(fileToProcess);
    +        }
    +    }
    +
    +    /*
    +     * Executes given queries using pre-defined connection.
    +     * Returns null on success, or a query string if failed.
    +     */
    +    protected Pair<String,SQLException> executeConfigStatements(final 
Connection con, final List<String> configQueries){
    +        if (configQueries == null || configQueries.isEmpty()) {
    +            return null;
    +        }
    +
    +        for (String confSQL : configQueries) {
    +            try(final Statement st = con.createStatement()){
    +                st.executeQuery(confSQL);
    --- End diff --
    
    Shouldn't this be st.execute()? When I run with a single "set" statement I 
get a SQLException that the query did not return a result set...


---

Reply via email to