This is an automated email from the ASF dual-hosted git repository. joewitt pushed a commit to branch support/nifi-1.9.x in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 04dc1d6de2062c00c95551b6273b9b04c586de5a Author: Matthew Burgess <[email protected]> AuthorDate: Wed Mar 6 10:48:25 2019 -0500 NIFI-6040: Fixed ExecuteSQL processors when Output Batch Size is set This closes #3355. Signed-off-by: Bryan Bende <[email protected]> --- .../processors/standard/AbstractExecuteSQL.java | 5 + .../nifi/processors/standard/TestExecuteSQL.java | 45 +++++++ .../processors/standard/TestExecuteSQLRecord.java | 146 +++++++++++++++++---- 3 files changed, 173 insertions(+), 23 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java index 76e36fd..e013a5c 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java @@ -312,6 +312,11 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor { // If we've reached the batch size, send out the flow files if (outputBatchSize > 0 && resultSetFlowFiles.size() >= outputBatchSize) { session.transfer(resultSetFlowFiles, REL_SUCCESS); + // Need to remove the original input file if it exists + if (fileToProcess != null) { + session.remove(fileToProcess); + fileToProcess = null; + } session.commit(); resultSetFlowFiles.clear(); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java index 35dfe76..5458434 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java @@ -283,6 +283,51 @@ public class TestExecuteSQL { } @Test + public void testWithOutputBatchingAndIncomingFlowFile() throws SQLException { + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + dbLocation.delete(); + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_NULL_INT"); + } catch (final SQLException sqle) { + } + + stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))"); + + for (int i = 0; i < 1000; i++) { + stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (" + i + ", 1, 1)"); + } + + runner.setIncomingConnection(true); + runner.setProperty(ExecuteSQL.MAX_ROWS_PER_FLOW_FILE, "5"); + runner.setProperty(ExecuteSQL.OUTPUT_BATCH_SIZE, "1"); + runner.enqueue("SELECT * FROM TEST_NULL_INT"); + runner.run(); + + runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 200); + runner.assertAllFlowFilesContainAttribute(ExecuteSQL.REL_SUCCESS, FragmentAttributes.FRAGMENT_INDEX.key()); + runner.assertAllFlowFilesContainAttribute(ExecuteSQL.REL_SUCCESS, FragmentAttributes.FRAGMENT_ID.key()); + + MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0); + + firstFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "5"); + firstFlowFile.assertAttributeNotExists(FragmentAttributes.FRAGMENT_COUNT.key()); + firstFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(), "0"); + firstFlowFile.assertAttributeEquals(ExecuteSQL.RESULTSET_INDEX, "0"); + + MockFlowFile lastFlowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(199); + + lastFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "5"); + lastFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(), "199"); + lastFlowFile.assertAttributeEquals(ExecuteSQL.RESULTSET_INDEX, "0"); + } + + @Test public void testMaxRowsPerFlowFile() throws SQLException { // remove previous test database, if any final File dbLocation = new File(DB_LOCATION); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java index b0f5cda..3d172ca 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java @@ -157,6 +157,106 @@ public class TestExecuteSQLRecord { } @Test + public void testWithOutputBatching() throws InitializationException, SQLException { + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + dbLocation.delete(); + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_NULL_INT"); + } catch (final SQLException sqle) { + } + + stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))"); + + for (int i = 0; i < 1000; i++) { + stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (" + i + ", 1, 1)"); + } + + MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1); + runner.addControllerService("writer", recordWriter); + runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer"); + runner.enableControllerService(recordWriter); + + runner.setIncomingConnection(false); + runner.setProperty(ExecuteSQLRecord.MAX_ROWS_PER_FLOW_FILE, "5"); + runner.setProperty(ExecuteSQLRecord.OUTPUT_BATCH_SIZE, "5"); + runner.setProperty(ExecuteSQLRecord.SQL_SELECT_QUERY, "SELECT * FROM TEST_NULL_INT"); + runner.run(); + + runner.assertAllFlowFilesTransferred(ExecuteSQLRecord.REL_SUCCESS, 200); + runner.assertAllFlowFilesContainAttribute(ExecuteSQLRecord.REL_SUCCESS, FragmentAttributes.FRAGMENT_INDEX.key()); + runner.assertAllFlowFilesContainAttribute(ExecuteSQLRecord.REL_SUCCESS, FragmentAttributes.FRAGMENT_ID.key()); + + MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQLRecord.REL_SUCCESS).get(0); + + firstFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULT_ROW_COUNT, "5"); + firstFlowFile.assertAttributeNotExists(FragmentAttributes.FRAGMENT_COUNT.key()); + firstFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(), "0"); + firstFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULTSET_INDEX, "0"); + + MockFlowFile lastFlowFile = runner.getFlowFilesForRelationship(ExecuteSQLRecord.REL_SUCCESS).get(199); + + lastFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULT_ROW_COUNT, "5"); + lastFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(), "199"); + lastFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULTSET_INDEX, "0"); + } + + @Test + public void testWithOutputBatchingAndIncomingFlowFile() throws InitializationException, SQLException { + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + dbLocation.delete(); + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_NULL_INT"); + } catch (final SQLException sqle) { + } + + stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))"); + + for (int i = 0; i < 1000; i++) { + stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (" + i + ", 1, 1)"); + } + + MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1); + runner.addControllerService("writer", recordWriter); + runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer"); + runner.enableControllerService(recordWriter); + + runner.setIncomingConnection(true); + runner.setProperty(ExecuteSQLRecord.MAX_ROWS_PER_FLOW_FILE, "5"); + runner.setProperty(ExecuteSQLRecord.OUTPUT_BATCH_SIZE, "1"); + runner.enqueue("SELECT * FROM TEST_NULL_INT"); + runner.run(); + + runner.assertAllFlowFilesTransferred(ExecuteSQLRecord.REL_SUCCESS, 200); + runner.assertAllFlowFilesContainAttribute(ExecuteSQLRecord.REL_SUCCESS, FragmentAttributes.FRAGMENT_INDEX.key()); + runner.assertAllFlowFilesContainAttribute(ExecuteSQLRecord.REL_SUCCESS, FragmentAttributes.FRAGMENT_ID.key()); + + MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQLRecord.REL_SUCCESS).get(0); + + firstFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULT_ROW_COUNT, "5"); + firstFlowFile.assertAttributeNotExists(FragmentAttributes.FRAGMENT_COUNT.key()); + firstFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(), "0"); + firstFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULTSET_INDEX, "0"); + + MockFlowFile lastFlowFile = runner.getFlowFilesForRelationship(ExecuteSQLRecord.REL_SUCCESS).get(199); + + lastFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULT_ROW_COUNT, "5"); + lastFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(), "199"); + lastFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULTSET_INDEX, "0"); + } + + @Test public void testMaxRowsPerFlowFile() throws Exception { // remove previous test database, if any final File dbLocation = new File(DB_LOCATION); @@ -257,7 +357,7 @@ public class TestExecuteSQLRecord { stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))"); runner.setIncomingConnection(true); - runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from TEST_NULL_INT"); + runner.setProperty(ExecuteSQLRecord.SQL_SELECT_QUERY, "select * from TEST_NULL_INT"); MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1); runner.addControllerService("writer", recordWriter); runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer"); @@ -265,9 +365,9 @@ public class TestExecuteSQLRecord { runner.enqueue("Hello".getBytes()); runner.run(); - runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1); - MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0); - firstFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "0"); + runner.assertAllFlowFilesTransferred(ExecuteSQLRecord.REL_SUCCESS, 1); + MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQLRecord.REL_SUCCESS).get(0); + firstFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULT_ROW_COUNT, "0"); firstFlowFile.assertContentEquals(""); } @@ -412,8 +512,8 @@ public class TestExecuteSQLRecord { stmt.execute("insert into TEST_NULL_INT values(1,2,3)"); runner.setIncomingConnection(true); - runner.setProperty(ExecuteSQL.SQL_PRE_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)"); - runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from TEST_NULL_INT"); + runner.setProperty(ExecuteSQLRecord.SQL_PRE_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)"); + runner.setProperty(ExecuteSQLRecord.SQL_SELECT_QUERY, "select * from TEST_NULL_INT"); MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1); runner.addControllerService("writer", recordWriter); runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer"); @@ -421,9 +521,9 @@ public class TestExecuteSQLRecord { runner.enqueue("test".getBytes()); runner.run(); - runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1); - MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0); - firstFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "1"); + runner.assertAllFlowFilesTransferred(ExecuteSQLRecord.REL_SUCCESS, 1); + MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQLRecord.REL_SUCCESS).get(0); + firstFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULT_ROW_COUNT, "1"); } @Test @@ -445,9 +545,9 @@ public class TestExecuteSQLRecord { stmt.execute("insert into TEST_NULL_INT values(1,2,3)"); runner.setIncomingConnection(true); - runner.setProperty(ExecuteSQL.SQL_PRE_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)"); - runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from TEST_NULL_INT"); - runner.setProperty(ExecuteSQL.SQL_POST_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(0);CALL SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(0)"); + runner.setProperty(ExecuteSQLRecord.SQL_PRE_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)"); + runner.setProperty(ExecuteSQLRecord.SQL_SELECT_QUERY, "select * from TEST_NULL_INT"); + runner.setProperty(ExecuteSQLRecord.SQL_POST_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(0);CALL SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(0)"); MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1); runner.addControllerService("writer", recordWriter); runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer"); @@ -455,9 +555,9 @@ public class TestExecuteSQLRecord { runner.enqueue("test".getBytes()); runner.run(); - runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1); - MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0); - firstFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "1"); + runner.assertAllFlowFilesTransferred(ExecuteSQLRecord.REL_SUCCESS, 1); + MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQLRecord.REL_SUCCESS).get(0); + firstFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULT_ROW_COUNT, "1"); } @Test @@ -479,8 +579,8 @@ public class TestExecuteSQLRecord { runner.setIncomingConnection(true); // Simulate failure by not provide parameter - runner.setProperty(ExecuteSQL.SQL_PRE_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS()"); - runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from TEST_NULL_INT"); + runner.setProperty(ExecuteSQLRecord.SQL_PRE_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS()"); + runner.setProperty(ExecuteSQLRecord.SQL_SELECT_QUERY, "select * from TEST_NULL_INT"); MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1); runner.addControllerService("writer", recordWriter); runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer"); @@ -488,7 +588,7 @@ public class TestExecuteSQLRecord { runner.enqueue("test".getBytes()); runner.run(); - runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_FAILURE, 1); + runner.assertAllFlowFilesTransferred(ExecuteSQLRecord.REL_FAILURE, 1); } @Test @@ -509,10 +609,10 @@ public class TestExecuteSQLRecord { stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))"); runner.setIncomingConnection(true); - runner.setProperty(ExecuteSQL.SQL_PRE_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)"); - runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from TEST_NULL_INT"); + runner.setProperty(ExecuteSQLRecord.SQL_PRE_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)"); + runner.setProperty(ExecuteSQLRecord.SQL_SELECT_QUERY, "select * from TEST_NULL_INT"); // Simulate failure by not provide parameter - runner.setProperty(ExecuteSQL.SQL_POST_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS()"); + runner.setProperty(ExecuteSQLRecord.SQL_POST_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS()"); MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1); runner.addControllerService("writer", recordWriter); runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer"); @@ -520,8 +620,8 @@ public class TestExecuteSQLRecord { runner.enqueue("test".getBytes()); runner.run(); - runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_FAILURE, 1); - MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_FAILURE).get(0); + runner.assertAllFlowFilesTransferred(ExecuteSQLRecord.REL_FAILURE, 1); + MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQLRecord.REL_FAILURE).get(0); firstFlowFile.assertContentEquals("test"); }
