Repository: nifi
Updated Branches:
  refs/heads/master c5a174ae6 -> aa6bc5d50


NIFI-1409: Fix missing transfer on error in ExecuteSql

Signed-off-by: jpercivall <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/aa6bc5d5
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/aa6bc5d5
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/aa6bc5d5

Branch: refs/heads/master
Commit: aa6bc5d505dde3078505199b8aae5eb2c399086a
Parents: c5a174a
Author: Matt Burgess <[email protected]>
Authored: Wed Jan 27 14:01:50 2016 -0500
Committer: jpercivall <[email protected]>
Committed: Wed Jan 27 18:23:32 2016 -0500

----------------------------------------------------------------------
 .../nifi/processors/standard/ExecuteSQL.java    | 106 +++++++++++--------
 .../processors/standard/TestExecuteSQL.java     |  25 +++++
 2 files changed, 84 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/aa6bc5d5/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
index f241592..02d9f88 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
@@ -53,50 +53,49 @@ import org.apache.nifi.util.StopWatch;
 @InputRequirement(Requirement.INPUT_ALLOWED)
 @Tags({"sql", "select", "jdbc", "query", "database"})
 @CapabilityDescription("Execute provided SQL select query. Query result will 
be converted to Avro format."
-    + " Streaming is used so arbitrarily large result sets are supported. This 
processor can be scheduled to run on " +
-    "a timer, or cron expression, using the standard scheduling methods, or it 
can be triggered by an incoming FlowFile. " +
-    "If it is triggered by an incoming FlowFile, then attributes of that 
FlowFile will be available when evaluating the " +
-    "select query. " +
-    "FlowFile attribute 'executesql.row.count' indicates how many rows were 
selected.")
+        + " Streaming is used so arbitrarily large result sets are supported. 
This processor can be scheduled to run on "
+        + "a timer, or cron expression, using the standard scheduling methods, 
or it can be triggered by an incoming FlowFile. "
+        + "If it is triggered by an incoming FlowFile, then attributes of that 
FlowFile will be available when evaluating the "
+        + "select query. FlowFile attribute 'executesql.row.count' indicates 
how many rows were selected.")
 public class ExecuteSQL extends AbstractProcessor {
 
     public static final String RESULT_ROW_COUNT = "executesql.row.count";
 
     // Relationships
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
-        .name("success")
-        .description("Successfully created FlowFile from SQL query result 
set.")
-        .build();
+            .name("success")
+            .description("Successfully created FlowFile from SQL query result 
set.")
+            .build();
     public static final Relationship REL_FAILURE = new Relationship.Builder()
-        .name("failure")
-        .description("SQL query execution failed. Incoming FlowFile will be 
penalized and routed to this relationship")
-        .build();
+            .name("failure")
+            .description("SQL query execution failed. Incoming FlowFile will 
be penalized and routed to this relationship")
+            .build();
     private final Set<Relationship> relationships;
 
     public static final PropertyDescriptor DBCP_SERVICE = new 
PropertyDescriptor.Builder()
-        .name("Database Connection Pooling Service")
-        .description("The Controller Service that is used to obtain connection 
to database")
-        .required(true)
-        .identifiesControllerService(DBCPService.class)
-        .build();
+            .name("Database Connection Pooling Service")
+            .description("The Controller Service that is used to obtain 
connection to database")
+            .required(true)
+            .identifiesControllerService(DBCPService.class)
+            .build();
 
     public static final PropertyDescriptor SQL_SELECT_QUERY = new 
PropertyDescriptor.Builder()
-        .name("SQL select query")
-        .description("SQL select query")
-        .required(true)
-        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-        .expressionLanguageSupported(true)
-        .build();
+            .name("SQL select query")
+            .description("SQL select query")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
 
     public static final PropertyDescriptor QUERY_TIMEOUT = new 
PropertyDescriptor.Builder()
-        .name("Max Wait Time")
-        .description("The maximum amount of time allowed for a running SQL 
select query "
-            + " , zero means there is no limit. Max time less than 1 second 
will be equal to zero.")
-        .defaultValue("0 seconds")
-        .required(true)
-        .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
-        .sensitive(false)
-        .build();
+            .name("Max Wait Time")
+            .description("The maximum amount of time allowed for a running SQL 
select query "
+                    + " , zero means there is no limit. Max time less than 1 
second will be equal to zero.")
+            .defaultValue("0 seconds")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .sensitive(false)
+            .build();
 
     private final List<PropertyDescriptor> propDescriptors;
 
@@ -125,36 +124,36 @@ public class ExecuteSQL extends AbstractProcessor {
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
-        FlowFile incoming = null;
+        FlowFile fileToProcess = null;
         if (context.hasIncomingConnection()) {
-            incoming = session.get();
+            fileToProcess = session.get();
 
             // If we have no FlowFile, and all incoming connections are 
self-loops then we can continue on.
             // However, if we have no FlowFile and we have connections coming 
from other Processors, then
             // we know that we should run only if we have a FlowFile.
-            if (incoming == null && context.hasNonLoopConnection()) {
+            if (fileToProcess == null && context.hasNonLoopConnection()) {
                 return;
             }
         }
 
         final ProcessorLog logger = getLogger();
-
         final DBCPService dbcpService = 
context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
-        final String selectQuery = 
context.getProperty(SQL_SELECT_QUERY).evaluateAttributeExpressions(incoming).getValue();
+        final String selectQuery = 
context.getProperty(SQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
         final Integer queryTimeout = 
context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
-
         final StopWatch stopWatch = new StopWatch(true);
 
         try (final Connection con = dbcpService.getConnection();
             final Statement st = con.createStatement()) {
             st.setQueryTimeout(queryTimeout); // timeout in seconds
             final LongHolder nrOfRows = new LongHolder(0L);
-            FlowFile outgoing = (incoming == null ? session.create() : 
incoming);
-            outgoing = session.write(outgoing, new OutputStreamCallback() {
+            if (fileToProcess == null) {
+                fileToProcess = session.create();
+            }
+            fileToProcess = session.write(fileToProcess, new 
OutputStreamCallback() {
                 @Override
                 public void process(final OutputStream out) throws IOException 
{
                     try {
-                        logger.debug("Executing query {}", new Object[] 
{selectQuery});
+                        logger.debug("Executing query {}", new 
Object[]{selectQuery});
                         final ResultSet resultSet = 
st.executeQuery(selectQuery);
                         nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, 
out));
                     } catch (final SQLException e) {
@@ -164,17 +163,30 @@ public class ExecuteSQL extends AbstractProcessor {
             });
 
             // set attribute how many rows were selected
-            outgoing = session.putAttribute(outgoing, RESULT_ROW_COUNT, 
nrOfRows.get().toString());
+            fileToProcess = session.putAttribute(fileToProcess, 
RESULT_ROW_COUNT, nrOfRows.get().toString());
 
-            logger.info("{} contains {} Avro records; transferring to 
'success'", new Object[] {outgoing, nrOfRows.get()});
-            session.getProvenanceReporter().modifyContent(outgoing, "Retrieved 
" + nrOfRows.get() + " rows", stopWatch.getElapsed(TimeUnit.MILLISECONDS));
-            session.transfer(outgoing, REL_SUCCESS);
+            logger.info("{} contains {} Avro records; transferring to 
'success'",
+                    new Object[]{fileToProcess, nrOfRows.get()});
+            session.getProvenanceReporter().modifyContent(fileToProcess, 
"Retrieved " + nrOfRows.get() + " rows",
+                    stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+            session.transfer(fileToProcess, REL_SUCCESS);
         } catch (final ProcessException | SQLException e) {
-            if (incoming == null) {
-                logger.error("Unable to execute SQL select query {} due to {}. 
No incoming flow file to route to failure", new Object[] {selectQuery, e});
+            if (fileToProcess == null) {
+                // This can happen if any exceptions occur while setting up 
the connection, statement, etc.
+                logger.error("Unable to execute SQL select query {} due to {}. 
No FlowFile to route to failure",
+                        new Object[]{selectQuery, e});
+                context.yield();
             } else {
-                logger.error("Unable to execute SQL select query {} for {} due 
to {}; routing to failure", new Object[] {selectQuery, incoming, e});
-                session.transfer(incoming, REL_FAILURE);
+                if (context.hasIncomingConnection()) {
+                    logger.error("Unable to execute SQL select query {} for {} 
due to {}; routing to failure",
+                            new Object[]{selectQuery, fileToProcess, e});
+                    fileToProcess = session.penalize(fileToProcess);
+                } else {
+                    logger.error("Unable to execute SQL select query {} due to 
{}; routing to failure",
+                            new Object[]{selectQuery, e});
+                    context.yield();
+                }
+                session.transfer(fileToProcess, REL_FAILURE);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa6bc5d5/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
index e381706..4da9b1f 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
@@ -152,6 +152,31 @@ public class TestExecuteSQL {
         
runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0).assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT,
 "2");
     }
 
+    @Test
+    public void testWithSqlException() throws SQLException {
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        dbLocation.delete();
+
+        // load test data to database
+        final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_NO_ROWS");
+        } catch (final SQLException sqle) {
+        }
+
+        stmt.execute("create table TEST_NO_ROWS (id integer)");
+
+        runner.setIncomingConnection(false);
+        // Try a valid SQL statment that will generate an error (val1 does not 
exist, e.g.)
+        runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "SELECT val1 FROM 
TEST_NO_ROWS");
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_FAILURE, 1);
+    }
+
     public void invokeOnTrigger(final Integer queryTimeout, final String 
query, final boolean incomingFlowFile)
         throws InitializationException, ClassNotFoundException, SQLException, 
IOException {
 

Reply via email to