This is an automated email from the ASF dual-hosted git repository.

bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new c32ea61  NIFI-6040: Fixed ExecuteSQL processors when Output Batch Size 
is set
c32ea61 is described below

commit c32ea618c5f454d9af25082452595b57c064b001
Author: Matthew Burgess <[email protected]>
AuthorDate: Wed Mar 6 10:48:25 2019 -0500

    NIFI-6040: Fixed ExecuteSQL processors when Output Batch Size is set
    
    This closes #3355.
    
    Signed-off-by: Bryan Bende <[email protected]>
---
 .../processors/standard/AbstractExecuteSQL.java    |   5 +
 .../nifi/processors/standard/TestExecuteSQL.java   |  45 +++++++
 .../processors/standard/TestExecuteSQLRecord.java  | 146 +++++++++++++++++----
 3 files changed, 173 insertions(+), 23 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
index 76e36fd..e013a5c 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
@@ -312,6 +312,11 @@ public abstract class AbstractExecuteSQL extends 
AbstractProcessor {
                                 // If we've reached the batch size, send out 
the flow files
                                 if (outputBatchSize > 0 && 
resultSetFlowFiles.size() >= outputBatchSize) {
                                     session.transfer(resultSetFlowFiles, 
REL_SUCCESS);
+                                    // Need to remove the original input file 
if it exists
+                                    if (fileToProcess != null) {
+                                        session.remove(fileToProcess);
+                                        fileToProcess = null;
+                                    }
                                     session.commit();
                                     resultSetFlowFiles.clear();
                                 }
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
index 35dfe76..5458434 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
@@ -283,6 +283,51 @@ public class TestExecuteSQL {
     }
 
     @Test
+    public void testWithOutputBatchingAndIncomingFlowFile() throws 
SQLException {
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        dbLocation.delete();
+
+        // load test data to database
+        final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_NULL_INT");
+        } catch (final SQLException sqle) {
+        }
+
+        stmt.execute("create table TEST_NULL_INT (id integer not null, val1 
integer, val2 integer, constraint my_pk primary key (id))");
+
+        for (int i = 0; i < 1000; i++) {
+            stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (" 
+ i + ", 1, 1)");
+        }
+
+        runner.setIncomingConnection(true);
+        runner.setProperty(ExecuteSQL.MAX_ROWS_PER_FLOW_FILE, "5");
+        runner.setProperty(ExecuteSQL.OUTPUT_BATCH_SIZE, "1");
+        runner.enqueue("SELECT * FROM TEST_NULL_INT");
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 200);
+        runner.assertAllFlowFilesContainAttribute(ExecuteSQL.REL_SUCCESS, 
FragmentAttributes.FRAGMENT_INDEX.key());
+        runner.assertAllFlowFilesContainAttribute(ExecuteSQL.REL_SUCCESS, 
FragmentAttributes.FRAGMENT_ID.key());
+
+        MockFlowFile firstFlowFile = 
runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0);
+
+        firstFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "5");
+        
firstFlowFile.assertAttributeNotExists(FragmentAttributes.FRAGMENT_COUNT.key());
+        
firstFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(), 
"0");
+        firstFlowFile.assertAttributeEquals(ExecuteSQL.RESULTSET_INDEX, "0");
+
+        MockFlowFile lastFlowFile = 
runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(199);
+
+        lastFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "5");
+        
lastFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(), 
"199");
+        lastFlowFile.assertAttributeEquals(ExecuteSQL.RESULTSET_INDEX, "0");
+    }
+
+    @Test
     public void testMaxRowsPerFlowFile() throws SQLException {
         // remove previous test database, if any
         final File dbLocation = new File(DB_LOCATION);
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
index 93997d1..6f6a091 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
@@ -170,6 +170,106 @@ public class TestExecuteSQLRecord {
     }
 
     @Test
+    public void testWithOutputBatching() throws InitializationException, 
SQLException {
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        dbLocation.delete();
+
+        // load test data to database
+        final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_NULL_INT");
+        } catch (final SQLException sqle) {
+        }
+
+        stmt.execute("create table TEST_NULL_INT (id integer not null, val1 
integer, val2 integer, constraint my_pk primary key (id))");
+
+        for (int i = 0; i < 1000; i++) {
+            stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (" 
+ i + ", 1, 1)");
+        }
+
+        MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
+        runner.addControllerService("writer", recordWriter);
+        runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
+        runner.enableControllerService(recordWriter);
+
+        runner.setIncomingConnection(false);
+        runner.setProperty(ExecuteSQLRecord.MAX_ROWS_PER_FLOW_FILE, "5");
+        runner.setProperty(ExecuteSQLRecord.OUTPUT_BATCH_SIZE, "5");
+        runner.setProperty(ExecuteSQLRecord.SQL_SELECT_QUERY, "SELECT * FROM 
TEST_NULL_INT");
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ExecuteSQLRecord.REL_SUCCESS, 
200);
+        
runner.assertAllFlowFilesContainAttribute(ExecuteSQLRecord.REL_SUCCESS, 
FragmentAttributes.FRAGMENT_INDEX.key());
+        
runner.assertAllFlowFilesContainAttribute(ExecuteSQLRecord.REL_SUCCESS, 
FragmentAttributes.FRAGMENT_ID.key());
+
+        MockFlowFile firstFlowFile = 
runner.getFlowFilesForRelationship(ExecuteSQLRecord.REL_SUCCESS).get(0);
+
+        firstFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULT_ROW_COUNT, 
"5");
+        
firstFlowFile.assertAttributeNotExists(FragmentAttributes.FRAGMENT_COUNT.key());
+        
firstFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(), 
"0");
+        firstFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULTSET_INDEX, 
"0");
+
+        MockFlowFile lastFlowFile = 
runner.getFlowFilesForRelationship(ExecuteSQLRecord.REL_SUCCESS).get(199);
+
+        lastFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULT_ROW_COUNT, 
"5");
+        
lastFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(), 
"199");
+        lastFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULTSET_INDEX, 
"0");
+    }
+
+    @Test
+    public void testWithOutputBatchingAndIncomingFlowFile() throws 
InitializationException, SQLException {
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        dbLocation.delete();
+
+        // load test data to database
+        final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_NULL_INT");
+        } catch (final SQLException sqle) {
+        }
+
+        stmt.execute("create table TEST_NULL_INT (id integer not null, val1 
integer, val2 integer, constraint my_pk primary key (id))");
+
+        for (int i = 0; i < 1000; i++) {
+            stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (" 
+ i + ", 1, 1)");
+        }
+
+        MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
+        runner.addControllerService("writer", recordWriter);
+        runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
+        runner.enableControllerService(recordWriter);
+
+        runner.setIncomingConnection(true);
+        runner.setProperty(ExecuteSQLRecord.MAX_ROWS_PER_FLOW_FILE, "5");
+        runner.setProperty(ExecuteSQLRecord.OUTPUT_BATCH_SIZE, "1");
+        runner.enqueue("SELECT * FROM TEST_NULL_INT");
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ExecuteSQLRecord.REL_SUCCESS, 
200);
+        
runner.assertAllFlowFilesContainAttribute(ExecuteSQLRecord.REL_SUCCESS, 
FragmentAttributes.FRAGMENT_INDEX.key());
+        
runner.assertAllFlowFilesContainAttribute(ExecuteSQLRecord.REL_SUCCESS, 
FragmentAttributes.FRAGMENT_ID.key());
+
+        MockFlowFile firstFlowFile = 
runner.getFlowFilesForRelationship(ExecuteSQLRecord.REL_SUCCESS).get(0);
+
+        firstFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULT_ROW_COUNT, 
"5");
+        
firstFlowFile.assertAttributeNotExists(FragmentAttributes.FRAGMENT_COUNT.key());
+        
firstFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(), 
"0");
+        firstFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULTSET_INDEX, 
"0");
+
+        MockFlowFile lastFlowFile = 
runner.getFlowFilesForRelationship(ExecuteSQLRecord.REL_SUCCESS).get(199);
+
+        lastFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULT_ROW_COUNT, 
"5");
+        
lastFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(), 
"199");
+        lastFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULTSET_INDEX, 
"0");
+    }
+
+    @Test
     public void testMaxRowsPerFlowFile() throws Exception {
         // remove previous test database, if any
         final File dbLocation = new File(DB_LOCATION);
@@ -333,7 +433,7 @@ public class TestExecuteSQLRecord {
         stmt.execute("create table TEST_NULL_INT (id integer not null, val1 
integer, val2 integer, constraint my_pk primary key (id))");
 
         runner.setIncomingConnection(true);
-        runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from 
TEST_NULL_INT");
+        runner.setProperty(ExecuteSQLRecord.SQL_SELECT_QUERY, "select * from 
TEST_NULL_INT");
         MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
         runner.addControllerService("writer", recordWriter);
         runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
@@ -341,9 +441,9 @@ public class TestExecuteSQLRecord {
         runner.enqueue("Hello".getBytes());
         runner.run();
 
-        runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
-        MockFlowFile firstFlowFile = 
runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0);
-        firstFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "0");
+        runner.assertAllFlowFilesTransferred(ExecuteSQLRecord.REL_SUCCESS, 1);
+        MockFlowFile firstFlowFile = 
runner.getFlowFilesForRelationship(ExecuteSQLRecord.REL_SUCCESS).get(0);
+        firstFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULT_ROW_COUNT, 
"0");
         firstFlowFile.assertContentEquals("");
     }
 
@@ -488,8 +588,8 @@ public class TestExecuteSQLRecord {
         stmt.execute("insert into TEST_NULL_INT values(1,2,3)");
 
         runner.setIncomingConnection(true);
-        runner.setProperty(ExecuteSQL.SQL_PRE_QUERY, "CALL 
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL 
SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)");
-        runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from 
TEST_NULL_INT");
+        runner.setProperty(ExecuteSQLRecord.SQL_PRE_QUERY, "CALL 
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL 
SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)");
+        runner.setProperty(ExecuteSQLRecord.SQL_SELECT_QUERY, "select * from 
TEST_NULL_INT");
         MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
         runner.addControllerService("writer", recordWriter);
         runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
@@ -497,9 +597,9 @@ public class TestExecuteSQLRecord {
         runner.enqueue("test".getBytes());
         runner.run();
 
-        runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
-        MockFlowFile firstFlowFile = 
runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0);
-        firstFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "1");
+        runner.assertAllFlowFilesTransferred(ExecuteSQLRecord.REL_SUCCESS, 1);
+        MockFlowFile firstFlowFile = 
runner.getFlowFilesForRelationship(ExecuteSQLRecord.REL_SUCCESS).get(0);
+        firstFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULT_ROW_COUNT, 
"1");
     }
 
     @Test
@@ -521,9 +621,9 @@ public class TestExecuteSQLRecord {
         stmt.execute("insert into TEST_NULL_INT values(1,2,3)");
 
         runner.setIncomingConnection(true);
-        runner.setProperty(ExecuteSQL.SQL_PRE_QUERY, "CALL 
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL 
SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)");
-        runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from 
TEST_NULL_INT");
-        runner.setProperty(ExecuteSQL.SQL_POST_QUERY, "CALL 
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(0);CALL 
SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(0)");
+        runner.setProperty(ExecuteSQLRecord.SQL_PRE_QUERY, "CALL 
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL 
SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)");
+        runner.setProperty(ExecuteSQLRecord.SQL_SELECT_QUERY, "select * from 
TEST_NULL_INT");
+        runner.setProperty(ExecuteSQLRecord.SQL_POST_QUERY, "CALL 
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(0);CALL 
SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(0)");
         MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
         runner.addControllerService("writer", recordWriter);
         runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
@@ -531,9 +631,9 @@ public class TestExecuteSQLRecord {
         runner.enqueue("test".getBytes());
         runner.run();
 
-        runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
-        MockFlowFile firstFlowFile = 
runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0);
-        firstFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "1");
+        runner.assertAllFlowFilesTransferred(ExecuteSQLRecord.REL_SUCCESS, 1);
+        MockFlowFile firstFlowFile = 
runner.getFlowFilesForRelationship(ExecuteSQLRecord.REL_SUCCESS).get(0);
+        firstFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULT_ROW_COUNT, 
"1");
     }
 
     @Test
@@ -555,8 +655,8 @@ public class TestExecuteSQLRecord {
 
         runner.setIncomingConnection(true);
         // Simulate failure by not provide parameter
-        runner.setProperty(ExecuteSQL.SQL_PRE_QUERY, "CALL 
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS()");
-        runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from 
TEST_NULL_INT");
+        runner.setProperty(ExecuteSQLRecord.SQL_PRE_QUERY, "CALL 
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS()");
+        runner.setProperty(ExecuteSQLRecord.SQL_SELECT_QUERY, "select * from 
TEST_NULL_INT");
         MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
         runner.addControllerService("writer", recordWriter);
         runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
@@ -564,7 +664,7 @@ public class TestExecuteSQLRecord {
         runner.enqueue("test".getBytes());
         runner.run();
 
-        runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_FAILURE, 1);
+        runner.assertAllFlowFilesTransferred(ExecuteSQLRecord.REL_FAILURE, 1);
     }
 
     @Test
@@ -585,10 +685,10 @@ public class TestExecuteSQLRecord {
         stmt.execute("create table TEST_NULL_INT (id integer not null, val1 
integer, val2 integer, constraint my_pk primary key (id))");
 
         runner.setIncomingConnection(true);
-        runner.setProperty(ExecuteSQL.SQL_PRE_QUERY, "CALL 
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL 
SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)");
-        runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from 
TEST_NULL_INT");
+        runner.setProperty(ExecuteSQLRecord.SQL_PRE_QUERY, "CALL 
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL 
SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)");
+        runner.setProperty(ExecuteSQLRecord.SQL_SELECT_QUERY, "select * from 
TEST_NULL_INT");
         // Simulate failure by not provide parameter
-        runner.setProperty(ExecuteSQL.SQL_POST_QUERY, "CALL 
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS()");
+        runner.setProperty(ExecuteSQLRecord.SQL_POST_QUERY, "CALL 
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS()");
         MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
         runner.addControllerService("writer", recordWriter);
         runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
@@ -596,8 +696,8 @@ public class TestExecuteSQLRecord {
         runner.enqueue("test".getBytes());
         runner.run();
 
-        runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_FAILURE, 1);
-        MockFlowFile firstFlowFile = 
runner.getFlowFilesForRelationship(ExecuteSQL.REL_FAILURE).get(0);
+        runner.assertAllFlowFilesTransferred(ExecuteSQLRecord.REL_FAILURE, 1);
+        MockFlowFile firstFlowFile = 
runner.getFlowFilesForRelationship(ExecuteSQLRecord.REL_FAILURE).get(0);
         firstFlowFile.assertContentEquals("test");
     }
 

Reply via email to