Repository: nifi
Updated Branches:
  refs/heads/0.x b8ff203c0 -> 133b34fc7


NIFI-1973 Allow ExecuteSQL to use flow file content as SQL query

This closes #498.

Signed-off-by: Bryan Bende <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/133b34fc
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/133b34fc
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/133b34fc

Branch: refs/heads/0.x
Commit: 133b34fc7f4d7db8abb5c02975b86c92ac818299
Parents: b8ff203
Author: Matt Burgess <[email protected]>
Authored: Mon Jun 6 11:19:32 2016 -0400
Committer: Bryan Bende <[email protected]>
Committed: Tue Jun 7 17:28:16 2016 -0400

----------------------------------------------------------------------
 .../nifi/processors/standard/ExecuteSQL.java    | 41 ++++++++++++++++++--
 .../processors/standard/TestExecuteSQL.java     | 38 +++++++++++++++---
 2 files changed, 70 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/133b34fc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
index 02d9f88..aecae68 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.processors.standard;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 import java.sql.Connection;
 import java.sql.ResultSet;
@@ -29,11 +30,14 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.nifi.annotation.behavior.EventDriven;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.dbcp.DBCPService;
 import org.apache.nifi.flowfile.FlowFile;
@@ -43,6 +47,7 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.standard.util.JdbcCommon;
@@ -57,6 +62,7 @@ import org.apache.nifi.util.StopWatch;
         + "a timer, or cron expression, using the standard scheduling methods, 
or it can be triggered by an incoming FlowFile. "
         + "If it is triggered by an incoming FlowFile, then attributes of that 
FlowFile will be available when evaluating the "
         + "select query. FlowFile attribute 'executesql.row.count' indicates 
how many rows were selected.")
+@WritesAttribute(attribute="executesql.row.count", description = "Contains the 
number of rows returned in the select query")
 public class ExecuteSQL extends AbstractProcessor {
 
     public static final String RESULT_ROW_COUNT = "executesql.row.count";
@@ -81,8 +87,12 @@ public class ExecuteSQL extends AbstractProcessor {
 
     public static final PropertyDescriptor SQL_SELECT_QUERY = new 
PropertyDescriptor.Builder()
             .name("SQL select query")
-            .description("SQL select query")
-            .required(true)
+            .description("The SQL select query to execute. The query can be 
empty, a constant value, or built from attributes "
+                    + "using Expression Language. If this property is 
specified, it will be used regardless of the content of "
+                    + "incoming flowfiles. If this property is empty, the 
content of the incoming flow file is expected "
+                    + "to contain a valid SQL select query, to be issued by 
the processor to the database. Note that Expression "
+                    + "Language is not evaluated for flow file contents.")
+            .required(false)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .expressionLanguageSupported(true)
             .build();
@@ -122,6 +132,17 @@ public class ExecuteSQL extends AbstractProcessor {
         return propDescriptors;
     }
 
+    @OnScheduled
+    public void setup(ProcessContext context) {
+        // If the query is not set, then an incoming flow file is needed. 
Otherwise fail the initialization
+        if (!context.getProperty(SQL_SELECT_QUERY).isSet() && 
!context.hasIncomingConnection()) {
+            final String errorString = "Either the Select Query must be 
specified or there must be an incoming connection "
+                    + "providing flowfile(s) containing a SQL select query";
+            getLogger().error(errorString);
+            throw new ProcessException(errorString);
+        }
+    }
+
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
         FlowFile fileToProcess = null;
@@ -138,9 +159,23 @@ public class ExecuteSQL extends AbstractProcessor {
 
         final ProcessorLog logger = getLogger();
         final DBCPService dbcpService = 
context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
-        final String selectQuery = 
context.getProperty(SQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
         final Integer queryTimeout = 
context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
         final StopWatch stopWatch = new StopWatch(true);
+        final String selectQuery;
+        if (context.getProperty(SQL_SELECT_QUERY).isSet()) {
+            selectQuery = 
context.getProperty(SQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
+        } else {
+            // If the query is not set, then an incoming flow file is 
required, and expected to contain a valid SQL select query.
+            // If there is no incoming connection, onTrigger will not be 
called as the processor will fail when scheduled.
+            final StringBuilder queryContents = new StringBuilder();
+            session.read(fileToProcess, new InputStreamCallback() {
+                @Override
+                public void process(InputStream in) throws IOException {
+                    queryContents.append(IOUtils.toString(in));
+                }
+            });
+            selectQuery = queryContents.toString();
+        }
 
         try (final Connection con = dbcpService.getConnection();
             final Statement st = con.createStatement()) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/133b34fc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
index 4da9b1f..5e2a64a 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
@@ -108,20 +108,39 @@ public class TestExecuteSQL {
     }
 
     @Test
+    public void testIncomingConnectionWithNoFlowFileAndNoQuery() throws 
InitializationException {
+        runner.setIncomingConnection(true);
+        runner.run();
+        runner.assertTransferCount(ExecuteSQL.REL_SUCCESS, 0);
+        runner.assertTransferCount(ExecuteSQL.REL_FAILURE, 0);
+    }
+
+    @Test(expected = AssertionError.class)
+    public void testNoIncomingConnectionAndNoQuery() throws 
InitializationException {
+        runner.setIncomingConnection(false);
+        runner.run();
+    }
+
+    @Test
     public void testNoIncomingConnection() throws ClassNotFoundException, 
SQLException, InitializationException, IOException {
         runner.setIncomingConnection(false);
-        invokeOnTrigger(null, QUERY_WITHOUT_EL, false);
+        invokeOnTrigger(null, QUERY_WITHOUT_EL, false, true);
     }
 
     @Test
     public void testNoTimeLimit() throws InitializationException, 
ClassNotFoundException, SQLException, IOException {
-        invokeOnTrigger(null, QUERY_WITH_EL, true);
+        invokeOnTrigger(null, QUERY_WITH_EL, true, true);
+    }
+
+    @Test
+    public void testSelectQueryInFlowFile() throws InitializationException, 
ClassNotFoundException, SQLException, IOException {
+        invokeOnTrigger(null, QUERY_WITHOUT_EL, true, false);
     }
 
     @Test
     public void testQueryTimeout() throws InitializationException, 
ClassNotFoundException, SQLException, IOException {
         // Does to seem to have any effect when using embedded Derby
-        invokeOnTrigger(1, QUERY_WITH_EL, true); // 1 second max time
+        invokeOnTrigger(1, QUERY_WITH_EL, true, true); // 1 second max time
     }
 
     @Test
@@ -177,7 +196,7 @@ public class TestExecuteSQL {
         runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_FAILURE, 1);
     }
 
-    public void invokeOnTrigger(final Integer queryTimeout, final String 
query, final boolean incomingFlowFile)
+    public void invokeOnTrigger(final Integer queryTimeout, final String 
query, final boolean incomingFlowFile, final boolean setQueryProperty)
         throws InitializationException, ClassNotFoundException, SQLException, 
IOException {
 
         if (queryTimeout != null) {
@@ -196,13 +215,20 @@ public class TestExecuteSQL {
         // ResultSet size will be 1x200x100 = 20 000 rows
         // because of where PER.ID = ${person.id}
         final int nrOfRows = 20000;
-        runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, query);
 
         if (incomingFlowFile) {
             // incoming FlowFile content is not used, but attributes are used
             final Map<String, String> attributes = new HashMap<>();
             attributes.put("person.id", "10");
-            runner.enqueue("Hello".getBytes(), attributes);
+            if (!setQueryProperty) {
+                runner.enqueue(query.getBytes(), attributes);
+            } else {
+                runner.enqueue("Hello".getBytes(), attributes);
+            }
+        }
+
+        if(setQueryProperty) {
+            runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, query);
         }
 
         runner.run();

Reply via email to