Repository: nifi
Updated Branches:
  refs/heads/master fd35b8ffd -> c59087bc3


NIFI-1049: Added unit test to verify that reported bug was fixed; corrected a 
typo in log message


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/07c619cf
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/07c619cf
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/07c619cf

Branch: refs/heads/master
Commit: 07c619cf40affef136e61f87d2075853e392bfc3
Parents: 0435911
Author: Mark Payne <[email protected]>
Authored: Fri Oct 23 10:36:55 2015 -0400
Committer: Mark Payne <[email protected]>
Committed: Mon Nov 30 15:37:58 2015 -0500

----------------------------------------------------------------------
 .../nifi/processors/standard/ExecuteSQL.java    | 16 +++---
 .../processors/standard/TestExecuteSQL.java     | 57 +++++++++++++++-----
 2 files changed, 50 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/07c619cf/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
index 9aa9d59..f241592 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
@@ -51,14 +51,13 @@ import org.apache.nifi.util.StopWatch;
 
 @EventDriven
 @InputRequirement(Requirement.INPUT_ALLOWED)
-@Tags({ "sql", "select", "jdbc", "query", "database" })
+@Tags({"sql", "select", "jdbc", "query", "database"})
 @CapabilityDescription("Execute provided SQL select query. Query result will 
be converted to Avro format."
     + " Streaming is used so arbitrarily large result sets are supported. This 
processor can be scheduled to run on " +
-        "a timer, or cron expression, using the standard scheduling methods, 
or it can be triggered by an incoming FlowFile. " +
-        "If it is triggered by an incoming FlowFile, then attributes of that 
FlowFile will be available when evaluating the " +
-        "select query. " +
-        "FlowFile attribute 'executesql.row.count' indicates how many rows 
were selected."
-        )
+    "a timer, or cron expression, using the standard scheduling methods, or it 
can be triggered by an incoming FlowFile. " +
+    "If it is triggered by an incoming FlowFile, then attributes of that 
FlowFile will be available when evaluating the " +
+    "select query. " +
+    "FlowFile attribute 'executesql.row.count' indicates how many rows were 
selected.")
 public class ExecuteSQL extends AbstractProcessor {
 
     public static final String RESULT_ROW_COUNT = "executesql.row.count";
@@ -155,7 +154,7 @@ public class ExecuteSQL extends AbstractProcessor {
                 @Override
                 public void process(final OutputStream out) throws IOException 
{
                     try {
-                        logger.debug("Executing query {}", new Object[] { 
selectQuery });
+                        logger.debug("Executing query {}", new Object[] 
{selectQuery});
                         final ResultSet resultSet = 
st.executeQuery(selectQuery);
                         nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, 
out));
                     } catch (final SQLException e) {
@@ -167,8 +166,7 @@ public class ExecuteSQL extends AbstractProcessor {
             // set attribute how many rows were selected
             outgoing = session.putAttribute(outgoing, RESULT_ROW_COUNT, 
nrOfRows.get().toString());
 
-            logger.info("{} contains {} Avro records", new Object[] { 
nrOfRows.get() });
-            logger.info("Transferred {} to 'success'", new Object[] { outgoing 
});
+            logger.info("{} contains {} Avro records; transferring to 
'success'", new Object[] {outgoing, nrOfRows.get()});
             session.getProvenanceReporter().modifyContent(outgoing, "Retrieved 
" + nrOfRows.get() + " rows", stopWatch.getElapsed(TimeUnit.MILLISECONDS));
             session.transfer(outgoing, REL_SUCCESS);
         } catch (final ProcessException | SQLException e) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/07c619cf/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
index 95a5210..89ed47f 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
@@ -24,6 +24,7 @@ import java.io.InputStream;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -63,20 +64,20 @@ public class TestExecuteSQL {
     final static String DB_LOCATION = "target/db";
 
     final static String QUERY_WITH_EL = "select "
-            + "  PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as 
PersonCode"
-            + ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as 
ProductCode"
-            + ", REL.ID as RelId,    REL.NAME as RelName,    REL.CODE as 
RelCode"
-            + ", ROW_NUMBER() OVER () as rownr "
-            + " from persons PER, products PRD, relationships REL"
-            + " where PER.ID = ${person.id}";
+        + "  PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as 
PersonCode"
+        + ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as 
ProductCode"
+        + ", REL.ID as RelId,    REL.NAME as RelName,    REL.CODE as RelCode"
+        + ", ROW_NUMBER() OVER () as rownr "
+        + " from persons PER, products PRD, relationships REL"
+        + " where PER.ID = ${person.id}";
 
     final static String QUERY_WITHOUT_EL = "select "
-            + "  PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as 
PersonCode"
-            + ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as 
ProductCode"
-            + ", REL.ID as RelId,    REL.NAME as RelName,    REL.CODE as 
RelCode"
-            + ", ROW_NUMBER() OVER () as rownr "
-            + " from persons PER, products PRD, relationships REL"
-            + " where PER.ID = 10";
+        + "  PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as 
PersonCode"
+        + ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as 
ProductCode"
+        + ", REL.ID as RelId,    REL.NAME as RelName,    REL.CODE as RelCode"
+        + ", ROW_NUMBER() OVER () as rownr "
+        + " from persons PER, products PRD, relationships REL"
+        + " where PER.ID = 10";
 
 
     @BeforeClass
@@ -123,8 +124,36 @@ public class TestExecuteSQL {
         invokeOnTrigger(1, QUERY_WITH_EL, true); // 1 second max time
     }
 
+    @Test
+    public void testWithNullIntColumn() throws SQLException {
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        dbLocation.delete();
+
+        // load test data to database
+        final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_NULL_INT");
+        } catch (final SQLException sqle) {
+        }
+
+        stmt.execute("create table TEST_NULL_INT (id integer not null, val1 
integer, val2 integer, constraint my_pk primary key (id))");
+
+        stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (0, 
NULL, 1)");
+        stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (1, 1, 
1)");
+
+        runner.setIncomingConnection(false);
+        runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "SELECT * FROM 
TEST_NULL_INT");
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
+        
runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0).assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT,
 "2");
+    }
+
     public void invokeOnTrigger(final Integer queryTimeout, final String 
query, final boolean incomingFlowFile)
-            throws InitializationException, ClassNotFoundException, 
SQLException, IOException {
+        throws InitializationException, ClassNotFoundException, SQLException, 
IOException {
 
         if (queryTimeout != null) {
             runner.setProperty(ExecuteSQL.QUERY_TIMEOUT, 
queryTimeout.toString() + " secs");
@@ -135,7 +164,7 @@ public class TestExecuteSQL {
         dbLocation.delete();
 
         // load test data to database
-        final Connection con = 
((DBCPService)runner.getControllerService("dbcp")).getConnection();
+        final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();
         TestJdbcHugeStream.loadTestData2Database(con, 100, 2000, 1000);
         LOGGER.info("test data loaded");
 

Reply via email to