Repository: nifi Updated Branches: refs/heads/master be0949570 -> 75906226a
NIFI-5780 Add pre and post statements to ExecuteSQL and ExecuteSQLRecord Signed-off-by: Peter Wicks <patric...@gmail.com> This closes #3156. Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/75906226 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/75906226 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/75906226 Branch: refs/heads/master Commit: 75906226a6f265acc7d87414ff477a5b0646b6d8 Parents: be09495 Author: yjhyjhyjh0 <yjhyjhy...@gmail.com> Authored: Thu Nov 8 00:25:50 2018 +0800 Committer: Peter Wicks <patric...@gmail.com> Committed: Thu Nov 15 13:18:31 2018 -0700 ---------------------------------------------------------------------- .../nifi/processors/hive/SelectHiveQL.java | 4 +- .../processors/standard/AbstractExecuteSQL.java | 79 +++++++++- .../nifi/processors/standard/ExecuteSQL.java | 2 + .../processors/standard/ExecuteSQLRecord.java | 2 + .../processors/standard/TestExecuteSQL.java | 146 +++++++++++++++++++ .../standard/TestExecuteSQLRecord.java | 132 +++++++++++++++++ 6 files changed, 362 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/75906226/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java index 5342c09..3b8576b 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java @@ -109,7 +109,7 @@ public class SelectHiveQL extends AbstractHiveQLProcessor { public static final PropertyDescriptor HIVEQL_PRE_QUERY = new PropertyDescriptor.Builder() .name("hive-pre-query") .displayName("HiveQL Pre-Query") - .description("HiveQL pre-query to execute. Semicolon-delimited list of queries. " + .description("A semicolon-delimited list of queries executed before the main SQL query is executed. " + "Example: 'set tez.queue.name=queue1; set hive.exec.orc.split.strategy=ETL; set hive.exec.reducers.bytes.per.reducer=1073741824'. " + "Note, the results/outputs of these queries will be suppressed if successfully executed.") .required(false) @@ -129,7 +129,7 @@ public class SelectHiveQL extends AbstractHiveQLProcessor { public static final PropertyDescriptor HIVEQL_POST_QUERY = new PropertyDescriptor.Builder() .name("hive-post-query") .displayName("HiveQL Post-Query") - .description("HiveQL post-query to execute. Semicolon-delimited list of queries. " + .description("A semicolon-delimited list of queries executed after the main SQL query is executed. " + "Note, the results/outputs of these queries will be suppressed if successfully executed.") .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) http://git-wip-us.apache.org/repos/asf/nifi/blob/75906226/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java index bf46549..d1fabef 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java @@ -17,6 +17,7 @@ package org.apache.nifi.processors.standard; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.tuple.Pair; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.dbcp.DBCPService; @@ -44,6 +45,7 @@ import java.sql.Statement; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -82,6 +84,17 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor { .identifiesControllerService(DBCPService.class) .build(); + public static final PropertyDescriptor SQL_PRE_QUERY = new PropertyDescriptor.Builder() + .name("sql-pre-query") + .displayName("SQL Pre-Query") + .description("A semicolon-delimited list of queries executed before the main SQL query is executed. " + + "For example, set session properties before main query. " + + "Results/outputs from these queries will be suppressed if there are no errors.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + public static final PropertyDescriptor SQL_SELECT_QUERY = new PropertyDescriptor.Builder() .name("SQL select query") .description("The SQL select query to execute. The query can be empty, a constant value, or built from attributes " @@ -94,6 +107,17 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor { .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .build(); + public static final PropertyDescriptor SQL_POST_QUERY = new PropertyDescriptor.Builder() + .name("sql-post-query") + .displayName("SQL Post-Query") + .description("A semicolon-delimited list of queries executed after the main SQL query is executed. " + + "Example like setting session properties after main query. " + + "Results/outputs from these queries will be suppressed if there are no errors.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + public static final PropertyDescriptor QUERY_TIMEOUT = new PropertyDescriptor.Builder() .name("Max Wait Time") .description("The maximum amount of time allowed for a running SQL select query " @@ -177,10 +201,12 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor { final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger(); final Integer outputBatchSizeField = context.getProperty(OUTPUT_BATCH_SIZE).evaluateAttributeExpressions().asInteger(); final int outputBatchSize = outputBatchSizeField == null ? 0 : outputBatchSizeField; + List<String> preQueries = getQueries(context.getProperty(SQL_PRE_QUERY).evaluateAttributeExpressions(fileToProcess).getValue()); + List<String> postQueries = getQueries(context.getProperty(SQL_POST_QUERY).evaluateAttributeExpressions(fileToProcess).getValue()); SqlWriter sqlWriter = configureSqlWriter(session, context, fileToProcess); - final String selectQuery; + String selectQuery; if (context.getProperty(SQL_SELECT_QUERY).isSet()) { selectQuery = context.getProperty(SQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue(); } else { @@ -196,6 +222,14 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor { final PreparedStatement st = con.prepareStatement(selectQuery)) { st.setQueryTimeout(queryTimeout); // timeout in seconds + // Execute pre-query, throw exception and cleanup Flow Files if fail + Pair<String,SQLException> failure = executeConfigStatements(con, preQueries); + if (failure != null) { + // In case of failure, assigning config query to "selectQuery" to follow current error handling + selectQuery = failure.getLeft(); + throw failure.getRight(); + } + if (fileToProcess != null) { JdbcCommon.setParameters(st, fileToProcess.getAttributes()); } @@ -317,6 +351,14 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor { } } + // Execute post-query, throw exception and cleanup Flow Files if fail + failure = executeConfigStatements(con, postQueries); + if (failure != null) { + selectQuery = failure.getLeft(); + resultSetFlowFiles.forEach(ff -> session.remove(ff)); + throw failure.getRight(); + } + // Transfer any remaining files to SUCCESS session.transfer(resultSetFlowFiles, REL_SUCCESS); resultSetFlowFiles.clear(); @@ -365,5 +407,40 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor { } } + /* + * Executes given queries using pre-defined connection. + * Returns null on success, or a query string if failed. + */ + protected Pair<String,SQLException> executeConfigStatements(final Connection con, final List<String> configQueries){ + if (configQueries == null || configQueries.isEmpty()) { + return null; + } + + for (String confSQL : configQueries) { + try(final Statement st = con.createStatement()){ + st.execute(confSQL); + } catch (SQLException e) { + return Pair.of(confSQL, e); + } + } + return null; + } + + /* + * Extract list of queries from config property + */ + protected List<String> getQueries(final String value) { + if (value == null || value.length() == 0 || value.trim().length() == 0) { + return null; + } + final List<String> queries = new LinkedList<>(); + for (String query : value.split(";")) { + if (query.trim().length() > 0) { + queries.add(query.trim()); + } + } + return queries; + } + protected abstract SqlWriter configureSqlWriter(ProcessSession session, ProcessContext context, FlowFile fileToProcess); } http://git-wip-us.apache.org/repos/asf/nifi/blob/75906226/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java index cc6d508..9c61793 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java @@ -114,7 +114,9 @@ public class ExecuteSQL extends AbstractExecuteSQL { final List<PropertyDescriptor> pds = new ArrayList<>(); pds.add(DBCP_SERVICE); + pds.add(SQL_PRE_QUERY); pds.add(SQL_SELECT_QUERY); + pds.add(SQL_POST_QUERY); pds.add(QUERY_TIMEOUT); pds.add(NORMALIZE_NAMES_FOR_AVRO); pds.add(USE_AVRO_LOGICAL_TYPES); http://git-wip-us.apache.org/repos/asf/nifi/blob/75906226/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java index 31d0ec8..5a84458 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java @@ -121,7 +121,9 @@ public class ExecuteSQLRecord extends AbstractExecuteSQL { final List<PropertyDescriptor> pds = new ArrayList<>(); pds.add(DBCP_SERVICE); + pds.add(SQL_PRE_QUERY); pds.add(SQL_SELECT_QUERY); + pds.add(SQL_POST_QUERY); pds.add(QUERY_TIMEOUT); pds.add(RECORD_WRITER_FACTORY); pds.add(NORMALIZE_NAMES); http://git-wip-us.apache.org/repos/asf/nifi/blob/75906226/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java index 63de91a..199bd94 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java @@ -544,7 +544,153 @@ public class TestExecuteSQL { } } + @Test + public void testPreQuery() throws Exception { + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + dbLocation.delete(); + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_NULL_INT"); + } catch (final SQLException sqle) { + } + + stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))"); + stmt.execute("insert into TEST_NULL_INT values(1,2,3)"); + + runner.setIncomingConnection(true); + runner.setProperty(ExecuteSQL.SQL_PRE_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)"); + runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from TEST_NULL_INT"); + runner.enqueue("test".getBytes()); + runner.run(); + + runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1); + MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0); + firstFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "1"); + + final InputStream in = new ByteArrayInputStream(firstFlowFile.toByteArray()); + final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(); + try (DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(in, datumReader)) { + GenericRecord record = null; + long recordsFromStream = 0; + while (dataFileReader.hasNext()) { + // Reuse record object by passing it to next(). This saves us from + // allocating and garbage collecting many objects for files with + // many items. + record = dataFileReader.next(record); + recordsFromStream += 1; + } + + assertEquals(1, recordsFromStream); + } + } + + @Test + public void testPostQuery() throws Exception { + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + dbLocation.delete(); + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_NULL_INT"); + } catch (final SQLException sqle) { + } + + stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))"); + stmt.execute("insert into TEST_NULL_INT values(1,2,3)"); + + runner.setIncomingConnection(true); + runner.setProperty(ExecuteSQL.SQL_PRE_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)"); + runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from TEST_NULL_INT"); + runner.setProperty(ExecuteSQL.SQL_POST_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(0);CALL SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(0)"); + runner.enqueue("test".getBytes()); + runner.run(); + + runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1); + MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0); + firstFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "1"); + + final InputStream in = new ByteArrayInputStream(firstFlowFile.toByteArray()); + final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(); + try (DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(in, datumReader)) { + GenericRecord record = null; + long recordsFromStream = 0; + while (dataFileReader.hasNext()) { + // Reuse record object by passing it to next(). This saves us from + // allocating and garbage collecting many objects for files with + // many items. + record = dataFileReader.next(record); + recordsFromStream += 1; + } + + assertEquals(1, recordsFromStream); + } + } + + @Test + public void testPreQueryFail() throws Exception { + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + dbLocation.delete(); + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_NULL_INT"); + } catch (final SQLException sqle) { + } + + stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))"); + + runner.setIncomingConnection(true); + // Simulate failure by not provide parameter + runner.setProperty(ExecuteSQL.SQL_PRE_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS()"); + runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from TEST_NULL_INT"); + runner.enqueue("test".getBytes()); + runner.run(); + + runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_FAILURE, 1); + } + + @Test + public void testPostQueryFail() throws Exception { + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + dbLocation.delete(); + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_NULL_INT"); + } catch (final SQLException sqle) { + } + stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))"); + + runner.setIncomingConnection(true); + runner.setProperty(ExecuteSQL.SQL_PRE_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)"); + runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from TEST_NULL_INT"); + // Simulate failure by not provide parameter + runner.setProperty(ExecuteSQL.SQL_POST_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS()"); + runner.enqueue("test".getBytes()); + runner.run(); + + runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_FAILURE, 1); + MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_FAILURE).get(0); + firstFlowFile.assertContentEquals("test"); + } /** * Simple implementation only for ExecuteSQL processor testing. http://git-wip-us.apache.org/repos/asf/nifi/blob/75906226/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java index 04c4c00..03cdbfc 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java @@ -350,6 +350,138 @@ public class TestExecuteSQLRecord { assertEquals(durationTime, fetchTime + executionTime); } + @Test + public void testPreQuery() throws Exception { + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + dbLocation.delete(); + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_NULL_INT"); + } catch (final SQLException sqle) { + } + + stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))"); + stmt.execute("insert into TEST_NULL_INT values(1,2,3)"); + + runner.setIncomingConnection(true); + runner.setProperty(ExecuteSQL.SQL_PRE_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)"); + runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from TEST_NULL_INT"); + MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1); + runner.addControllerService("writer", recordWriter); + runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer"); + runner.enableControllerService(recordWriter); + runner.enqueue("test".getBytes()); + runner.run(); + + runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1); + MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0); + firstFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "1"); + } + + @Test + public void testPostQuery() throws Exception { + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + dbLocation.delete(); + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_NULL_INT"); + } catch (final SQLException sqle) { + } + + stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))"); + stmt.execute("insert into TEST_NULL_INT values(1,2,3)"); + + runner.setIncomingConnection(true); + runner.setProperty(ExecuteSQL.SQL_PRE_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)"); + runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from TEST_NULL_INT"); + runner.setProperty(ExecuteSQL.SQL_POST_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(0);CALL SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(0)"); + MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1); + runner.addControllerService("writer", recordWriter); + runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer"); + runner.enableControllerService(recordWriter); + runner.enqueue("test".getBytes()); + runner.run(); + + runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1); + MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0); + firstFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "1"); + } + + @Test + public void testPreQueryFail() throws Exception { + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + dbLocation.delete(); + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_NULL_INT"); + } catch (final SQLException sqle) { + } + + stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))"); + + runner.setIncomingConnection(true); + // Simulate failure by not provide parameter + runner.setProperty(ExecuteSQL.SQL_PRE_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS()"); + runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from TEST_NULL_INT"); + MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1); + runner.addControllerService("writer", recordWriter); + runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer"); + runner.enableControllerService(recordWriter); + runner.enqueue("test".getBytes()); + runner.run(); + + runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_FAILURE, 1); + } + + @Test + public void testPostQueryFail() throws Exception { + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + dbLocation.delete(); + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_NULL_INT"); + } catch (final SQLException sqle) { + } + + stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))"); + + runner.setIncomingConnection(true); + runner.setProperty(ExecuteSQL.SQL_PRE_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)"); + runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from TEST_NULL_INT"); + // Simulate failure by not provide parameter + runner.setProperty(ExecuteSQL.SQL_POST_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS()"); + MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1); + runner.addControllerService("writer", recordWriter); + runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer"); + runner.enableControllerService(recordWriter); + runner.enqueue("test".getBytes()); + runner.run(); + + runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_FAILURE, 1); + MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_FAILURE).get(0); + firstFlowFile.assertContentEquals("test"); + } + /** * Simple implementation only for ExecuteSQL processor testing.