This is an automated email from the ASF dual-hosted git repository.
bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new c32ea61 NIFI-6040: Fixed ExecuteSQL processors when Output Batch Size
is set
c32ea61 is described below
commit c32ea618c5f454d9af25082452595b57c064b001
Author: Matthew Burgess <[email protected]>
AuthorDate: Wed Mar 6 10:48:25 2019 -0500
NIFI-6040: Fixed ExecuteSQL processors when Output Batch Size is set
This closes #3355.
Signed-off-by: Bryan Bende <[email protected]>
---
.../processors/standard/AbstractExecuteSQL.java | 5 +
.../nifi/processors/standard/TestExecuteSQL.java | 45 +++++++
.../processors/standard/TestExecuteSQLRecord.java | 146 +++++++++++++++++----
3 files changed, 173 insertions(+), 23 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
index 76e36fd..e013a5c 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
@@ -312,6 +312,11 @@ public abstract class AbstractExecuteSQL extends
AbstractProcessor {
// If we've reached the batch size, send out
the flow files
if (outputBatchSize > 0 &&
resultSetFlowFiles.size() >= outputBatchSize) {
session.transfer(resultSetFlowFiles,
REL_SUCCESS);
+ // Need to remove the original input file
if it exists
+ if (fileToProcess != null) {
+ session.remove(fileToProcess);
+ fileToProcess = null;
+ }
session.commit();
resultSetFlowFiles.clear();
}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
index 35dfe76..5458434 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
@@ -283,6 +283,51 @@ public class TestExecuteSQL {
}
@Test
+ public void testWithOutputBatchingAndIncomingFlowFile() throws
SQLException {
+ // remove previous test database, if any
+ final File dbLocation = new File(DB_LOCATION);
+ dbLocation.delete();
+
+ // load test data to database
+ final Connection con = ((DBCPService)
runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+
+ try {
+ stmt.execute("drop table TEST_NULL_INT");
+ } catch (final SQLException sqle) {
+ }
+
+ stmt.execute("create table TEST_NULL_INT (id integer not null, val1
integer, val2 integer, constraint my_pk primary key (id))");
+
+ for (int i = 0; i < 1000; i++) {
+ stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES ("
+ i + ", 1, 1)");
+ }
+
+ runner.setIncomingConnection(true);
+ runner.setProperty(ExecuteSQL.MAX_ROWS_PER_FLOW_FILE, "5");
+ runner.setProperty(ExecuteSQL.OUTPUT_BATCH_SIZE, "1");
+ runner.enqueue("SELECT * FROM TEST_NULL_INT");
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 200);
+ runner.assertAllFlowFilesContainAttribute(ExecuteSQL.REL_SUCCESS,
FragmentAttributes.FRAGMENT_INDEX.key());
+ runner.assertAllFlowFilesContainAttribute(ExecuteSQL.REL_SUCCESS,
FragmentAttributes.FRAGMENT_ID.key());
+
+ MockFlowFile firstFlowFile =
runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0);
+
+ firstFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "5");
+
firstFlowFile.assertAttributeNotExists(FragmentAttributes.FRAGMENT_COUNT.key());
+
firstFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(),
"0");
+ firstFlowFile.assertAttributeEquals(ExecuteSQL.RESULTSET_INDEX, "0");
+
+ MockFlowFile lastFlowFile =
runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(199);
+
+ lastFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "5");
+
lastFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(),
"199");
+ lastFlowFile.assertAttributeEquals(ExecuteSQL.RESULTSET_INDEX, "0");
+ }
+
+ @Test
public void testMaxRowsPerFlowFile() throws SQLException {
// remove previous test database, if any
final File dbLocation = new File(DB_LOCATION);
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
index 93997d1..6f6a091 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
@@ -170,6 +170,106 @@ public class TestExecuteSQLRecord {
}
@Test
+ public void testWithOutputBatching() throws InitializationException,
SQLException {
+ // remove previous test database, if any
+ final File dbLocation = new File(DB_LOCATION);
+ dbLocation.delete();
+
+ // load test data to database
+ final Connection con = ((DBCPService)
runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+
+ try {
+ stmt.execute("drop table TEST_NULL_INT");
+ } catch (final SQLException sqle) {
+ }
+
+ stmt.execute("create table TEST_NULL_INT (id integer not null, val1
integer, val2 integer, constraint my_pk primary key (id))");
+
+ for (int i = 0; i < 1000; i++) {
+ stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES ("
+ i + ", 1, 1)");
+ }
+
+ MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
+ runner.addControllerService("writer", recordWriter);
+ runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
+ runner.enableControllerService(recordWriter);
+
+ runner.setIncomingConnection(false);
+ runner.setProperty(ExecuteSQLRecord.MAX_ROWS_PER_FLOW_FILE, "5");
+ runner.setProperty(ExecuteSQLRecord.OUTPUT_BATCH_SIZE, "5");
+ runner.setProperty(ExecuteSQLRecord.SQL_SELECT_QUERY, "SELECT * FROM
TEST_NULL_INT");
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ExecuteSQLRecord.REL_SUCCESS,
200);
+
runner.assertAllFlowFilesContainAttribute(ExecuteSQLRecord.REL_SUCCESS,
FragmentAttributes.FRAGMENT_INDEX.key());
+
runner.assertAllFlowFilesContainAttribute(ExecuteSQLRecord.REL_SUCCESS,
FragmentAttributes.FRAGMENT_ID.key());
+
+ MockFlowFile firstFlowFile =
runner.getFlowFilesForRelationship(ExecuteSQLRecord.REL_SUCCESS).get(0);
+
+ firstFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULT_ROW_COUNT,
"5");
+
firstFlowFile.assertAttributeNotExists(FragmentAttributes.FRAGMENT_COUNT.key());
+
firstFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(),
"0");
+ firstFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULTSET_INDEX,
"0");
+
+ MockFlowFile lastFlowFile =
runner.getFlowFilesForRelationship(ExecuteSQLRecord.REL_SUCCESS).get(199);
+
+ lastFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULT_ROW_COUNT,
"5");
+
lastFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(),
"199");
+ lastFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULTSET_INDEX,
"0");
+ }
+
+ @Test
+ public void testWithOutputBatchingAndIncomingFlowFile() throws
InitializationException, SQLException {
+ // remove previous test database, if any
+ final File dbLocation = new File(DB_LOCATION);
+ dbLocation.delete();
+
+ // load test data to database
+ final Connection con = ((DBCPService)
runner.getControllerService("dbcp")).getConnection();
+ Statement stmt = con.createStatement();
+
+ try {
+ stmt.execute("drop table TEST_NULL_INT");
+ } catch (final SQLException sqle) {
+ }
+
+ stmt.execute("create table TEST_NULL_INT (id integer not null, val1
integer, val2 integer, constraint my_pk primary key (id))");
+
+ for (int i = 0; i < 1000; i++) {
+ stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES ("
+ i + ", 1, 1)");
+ }
+
+ MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
+ runner.addControllerService("writer", recordWriter);
+ runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
+ runner.enableControllerService(recordWriter);
+
+ runner.setIncomingConnection(true);
+ runner.setProperty(ExecuteSQLRecord.MAX_ROWS_PER_FLOW_FILE, "5");
+ runner.setProperty(ExecuteSQLRecord.OUTPUT_BATCH_SIZE, "1");
+ runner.enqueue("SELECT * FROM TEST_NULL_INT");
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ExecuteSQLRecord.REL_SUCCESS,
200);
+
runner.assertAllFlowFilesContainAttribute(ExecuteSQLRecord.REL_SUCCESS,
FragmentAttributes.FRAGMENT_INDEX.key());
+
runner.assertAllFlowFilesContainAttribute(ExecuteSQLRecord.REL_SUCCESS,
FragmentAttributes.FRAGMENT_ID.key());
+
+ MockFlowFile firstFlowFile =
runner.getFlowFilesForRelationship(ExecuteSQLRecord.REL_SUCCESS).get(0);
+
+ firstFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULT_ROW_COUNT,
"5");
+
firstFlowFile.assertAttributeNotExists(FragmentAttributes.FRAGMENT_COUNT.key());
+
firstFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(),
"0");
+ firstFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULTSET_INDEX,
"0");
+
+ MockFlowFile lastFlowFile =
runner.getFlowFilesForRelationship(ExecuteSQLRecord.REL_SUCCESS).get(199);
+
+ lastFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULT_ROW_COUNT,
"5");
+
lastFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(),
"199");
+ lastFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULTSET_INDEX,
"0");
+ }
+
+ @Test
public void testMaxRowsPerFlowFile() throws Exception {
// remove previous test database, if any
final File dbLocation = new File(DB_LOCATION);
@@ -333,7 +433,7 @@ public class TestExecuteSQLRecord {
stmt.execute("create table TEST_NULL_INT (id integer not null, val1
integer, val2 integer, constraint my_pk primary key (id))");
runner.setIncomingConnection(true);
- runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from
TEST_NULL_INT");
+ runner.setProperty(ExecuteSQLRecord.SQL_SELECT_QUERY, "select * from
TEST_NULL_INT");
MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
runner.addControllerService("writer", recordWriter);
runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
@@ -341,9 +441,9 @@ public class TestExecuteSQLRecord {
runner.enqueue("Hello".getBytes());
runner.run();
- runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
- MockFlowFile firstFlowFile =
runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0);
- firstFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "0");
+ runner.assertAllFlowFilesTransferred(ExecuteSQLRecord.REL_SUCCESS, 1);
+ MockFlowFile firstFlowFile =
runner.getFlowFilesForRelationship(ExecuteSQLRecord.REL_SUCCESS).get(0);
+ firstFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULT_ROW_COUNT,
"0");
firstFlowFile.assertContentEquals("");
}
@@ -488,8 +588,8 @@ public class TestExecuteSQLRecord {
stmt.execute("insert into TEST_NULL_INT values(1,2,3)");
runner.setIncomingConnection(true);
- runner.setProperty(ExecuteSQL.SQL_PRE_QUERY, "CALL
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL
SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)");
- runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from
TEST_NULL_INT");
+ runner.setProperty(ExecuteSQLRecord.SQL_PRE_QUERY, "CALL
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL
SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)");
+ runner.setProperty(ExecuteSQLRecord.SQL_SELECT_QUERY, "select * from
TEST_NULL_INT");
MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
runner.addControllerService("writer", recordWriter);
runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
@@ -497,9 +597,9 @@ public class TestExecuteSQLRecord {
runner.enqueue("test".getBytes());
runner.run();
- runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
- MockFlowFile firstFlowFile =
runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0);
- firstFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "1");
+ runner.assertAllFlowFilesTransferred(ExecuteSQLRecord.REL_SUCCESS, 1);
+ MockFlowFile firstFlowFile =
runner.getFlowFilesForRelationship(ExecuteSQLRecord.REL_SUCCESS).get(0);
+ firstFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULT_ROW_COUNT,
"1");
}
@Test
@@ -521,9 +621,9 @@ public class TestExecuteSQLRecord {
stmt.execute("insert into TEST_NULL_INT values(1,2,3)");
runner.setIncomingConnection(true);
- runner.setProperty(ExecuteSQL.SQL_PRE_QUERY, "CALL
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL
SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)");
- runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from
TEST_NULL_INT");
- runner.setProperty(ExecuteSQL.SQL_POST_QUERY, "CALL
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(0);CALL
SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(0)");
+ runner.setProperty(ExecuteSQLRecord.SQL_PRE_QUERY, "CALL
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL
SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)");
+ runner.setProperty(ExecuteSQLRecord.SQL_SELECT_QUERY, "select * from
TEST_NULL_INT");
+ runner.setProperty(ExecuteSQLRecord.SQL_POST_QUERY, "CALL
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(0);CALL
SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(0)");
MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
runner.addControllerService("writer", recordWriter);
runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
@@ -531,9 +631,9 @@ public class TestExecuteSQLRecord {
runner.enqueue("test".getBytes());
runner.run();
- runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
- MockFlowFile firstFlowFile =
runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0);
- firstFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "1");
+ runner.assertAllFlowFilesTransferred(ExecuteSQLRecord.REL_SUCCESS, 1);
+ MockFlowFile firstFlowFile =
runner.getFlowFilesForRelationship(ExecuteSQLRecord.REL_SUCCESS).get(0);
+ firstFlowFile.assertAttributeEquals(ExecuteSQLRecord.RESULT_ROW_COUNT,
"1");
}
@Test
@@ -555,8 +655,8 @@ public class TestExecuteSQLRecord {
runner.setIncomingConnection(true);
// Simulate failure by not provide parameter
- runner.setProperty(ExecuteSQL.SQL_PRE_QUERY, "CALL
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS()");
- runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from
TEST_NULL_INT");
+ runner.setProperty(ExecuteSQLRecord.SQL_PRE_QUERY, "CALL
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS()");
+ runner.setProperty(ExecuteSQLRecord.SQL_SELECT_QUERY, "select * from
TEST_NULL_INT");
MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
runner.addControllerService("writer", recordWriter);
runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
@@ -564,7 +664,7 @@ public class TestExecuteSQLRecord {
runner.enqueue("test".getBytes());
runner.run();
- runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_FAILURE, 1);
+ runner.assertAllFlowFilesTransferred(ExecuteSQLRecord.REL_FAILURE, 1);
}
@Test
@@ -585,10 +685,10 @@ public class TestExecuteSQLRecord {
stmt.execute("create table TEST_NULL_INT (id integer not null, val1
integer, val2 integer, constraint my_pk primary key (id))");
runner.setIncomingConnection(true);
- runner.setProperty(ExecuteSQL.SQL_PRE_QUERY, "CALL
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL
SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)");
- runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from
TEST_NULL_INT");
+ runner.setProperty(ExecuteSQLRecord.SQL_PRE_QUERY, "CALL
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL
SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)");
+ runner.setProperty(ExecuteSQLRecord.SQL_SELECT_QUERY, "select * from
TEST_NULL_INT");
// Simulate failure by not provide parameter
- runner.setProperty(ExecuteSQL.SQL_POST_QUERY, "CALL
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS()");
+ runner.setProperty(ExecuteSQLRecord.SQL_POST_QUERY, "CALL
SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS()");
MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
runner.addControllerService("writer", recordWriter);
runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
@@ -596,8 +696,8 @@ public class TestExecuteSQLRecord {
runner.enqueue("test".getBytes());
runner.run();
- runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_FAILURE, 1);
- MockFlowFile firstFlowFile =
runner.getFlowFilesForRelationship(ExecuteSQL.REL_FAILURE).get(0);
+ runner.assertAllFlowFilesTransferred(ExecuteSQLRecord.REL_FAILURE, 1);
+ MockFlowFile firstFlowFile =
runner.getFlowFilesForRelationship(ExecuteSQLRecord.REL_FAILURE).get(0);
firstFlowFile.assertContentEquals("test");
}