Repository: nifi Updated Branches: refs/heads/master 9b461027a -> 187417d07
NIFI-5044 SelectHiveQL accept only one statement SelectHiveQL support only single SELECT statement. This change adds support for pre- and post- select statements. It will be useful for configuration queries, i.e. "set tez.queue.name=default", and others. renamed selectQuery to hqlStatement for better readability style check correction in comment removed meaningless examples for post-queries execute query without result set Signed-off-by: Matthew Burgess <mattyb...@apache.org> This closes #2695 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/187417d0 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/187417d0 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/187417d0 Branch: refs/heads/master Commit: 187417d0772d8c08c7efb04c7a0dfda31b20d607 Parents: 9b46102 Author: Ed B <eberezitsky@HW14163.local> Authored: Sun May 13 01:24:09 2018 -0400 Committer: Matthew Burgess <mattyb...@apache.org> Committed: Fri Jun 15 15:48:16 2018 -0400 ---------------------------------------------------------------------- .../nifi/processors/hive/SelectHiveQL.java | 119 ++++++++++--- .../nifi/processors/hive/TestSelectHiveQL.java | 167 +++++++++++++++---- 2 files changed, 230 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/187417d0/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java index b811278..5342c09 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -35,6 +36,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.tuple.Pair; import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; @@ -100,10 +102,21 @@ public class SelectHiveQL extends AbstractHiveQLProcessor { .build(); public static final Relationship REL_FAILURE = new Relationship.Builder() .name("failure") - .description("HiveQL query execution failed. Incoming FlowFile will be penalized and routed to this relationship") + .description("HiveQL query execution failed. Incoming FlowFile will be penalized and routed to this relationship.") .build(); + public static final PropertyDescriptor HIVEQL_PRE_QUERY = new PropertyDescriptor.Builder() + .name("hive-pre-query") + .displayName("HiveQL Pre-Query") + .description("HiveQL pre-query to execute. Semicolon-delimited list of queries. " + + "Example: 'set tez.queue.name=queue1; set hive.exec.orc.split.strategy=ETL; set hive.exec.reducers.bytes.per.reducer=1073741824'. " + + "Note, the results/outputs of these queries will be suppressed if successfully executed.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + public static final PropertyDescriptor HIVEQL_SELECT_QUERY = new PropertyDescriptor.Builder() .name("hive-query") .displayName("HiveQL Select Query") @@ -113,6 +126,16 @@ public class SelectHiveQL extends AbstractHiveQLProcessor { .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .build(); + public static final PropertyDescriptor HIVEQL_POST_QUERY = new PropertyDescriptor.Builder() + .name("hive-post-query") + .displayName("HiveQL Post-Query") + .description("HiveQL post-query to execute. Semicolon-delimited list of queries. " + + "Note, the results/outputs of these queries will be suppressed if successfully executed.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + public static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder() .name("hive-fetch-size") .displayName("Fetch Size") @@ -214,7 +237,9 @@ public class SelectHiveQL extends AbstractHiveQLProcessor { static { List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>(); _propertyDescriptors.add(HIVE_DBCP_SERVICE); + _propertyDescriptors.add(HIVEQL_PRE_QUERY); _propertyDescriptors.add(HIVEQL_SELECT_QUERY); + _propertyDescriptors.add(HIVEQL_POST_QUERY); _propertyDescriptors.add(FETCH_SIZE); _propertyDescriptors.add(MAX_ROWS_PER_FLOW_FILE); _propertyDescriptors.add(MAX_FRAGMENTS); @@ -277,19 +302,22 @@ public class SelectHiveQL extends AbstractHiveQLProcessor { final HiveDBCPService dbcpService = context.getProperty(HIVE_DBCP_SERVICE).asControllerService(HiveDBCPService.class); final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue()); + List<String> preQueries = getQueries(context.getProperty(HIVEQL_PRE_QUERY).evaluateAttributeExpressions(fileToProcess).getValue()); + List<String> postQueries = getQueries(context.getProperty(HIVEQL_POST_QUERY).evaluateAttributeExpressions(fileToProcess).getValue()); + final boolean flowbased = !(context.getProperty(HIVEQL_SELECT_QUERY).isSet()); // Source the SQL - final String selectQuery; + String hqlStatement; if (context.getProperty(HIVEQL_SELECT_QUERY).isSet()) { - selectQuery = context.getProperty(HIVEQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue(); + hqlStatement = context.getProperty(HIVEQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue(); } else { // If the query is not set, then an incoming flow file is required, and expected to contain a valid SQL select query. // If there is no incoming connection, onTrigger will not be called as the processor will fail when scheduled. final StringBuilder queryContents = new StringBuilder(); session.read(fileToProcess, in -> queryContents.append(IOUtils.toString(in, charset))); - selectQuery = queryContents.toString(); + hqlStatement = queryContents.toString(); } @@ -309,9 +337,16 @@ public class SelectHiveQL extends AbstractHiveQLProcessor { final String fragmentIdentifier = UUID.randomUUID().toString(); try (final Connection con = dbcpService.getConnection(fileToProcess == null ? Collections.emptyMap() : fileToProcess.getAttributes()); - final Statement st = (flowbased ? con.prepareStatement(selectQuery) : con.createStatement()) + final Statement st = (flowbased ? con.prepareStatement(hqlStatement) : con.createStatement()) ) { - + Pair<String,SQLException> failure = executeConfigStatements(con, preQueries); + if (failure != null) { + // In case of failure, assigning config query to "hqlStatement" to follow current error handling + hqlStatement = failure.getLeft(); + flowfile = (fileToProcess == null) ? session.create() : fileToProcess; + fileToProcess = null; + throw failure.getRight(); + } if (fetchSize != null && fetchSize > 0) { try { st.setFetchSize(fetchSize); @@ -323,14 +358,14 @@ public class SelectHiveQL extends AbstractHiveQLProcessor { final List<FlowFile> resultSetFlowFiles = new ArrayList<>(); try { - logger.debug("Executing query {}", new Object[]{selectQuery}); + logger.debug("Executing query {}", new Object[]{hqlStatement}); if (flowbased) { // Hive JDBC Doesn't Support this yet: // ParameterMetaData pmd = ((PreparedStatement)st).getParameterMetaData(); // int paramCount = pmd.getParameterCount(); // Alternate way to determine number of params in SQL. - int paramCount = StringUtils.countMatches(selectQuery, "?"); + int paramCount = StringUtils.countMatches(hqlStatement, "?"); if (paramCount > 0) { setParameters(1, (PreparedStatement) st, paramCount, fileToProcess.getAttributes()); @@ -340,7 +375,7 @@ public class SelectHiveQL extends AbstractHiveQLProcessor { final ResultSet resultSet; try { - resultSet = (flowbased ? ((PreparedStatement) st).executeQuery() : st.executeQuery(selectQuery)); + resultSet = (flowbased ? ((PreparedStatement) st).executeQuery() : st.executeQuery(hqlStatement)); } catch (SQLException se) { // If an error occurs during the query, a flowfile is expected to be routed to failure, so ensure one here flowfile = (fileToProcess == null) ? session.create() : fileToProcess; @@ -385,10 +420,10 @@ public class SelectHiveQL extends AbstractHiveQLProcessor { try { // Set input/output table names by parsing the query - attributes.putAll(toQueryTableAttributes(findTableNames(selectQuery))); + attributes.putAll(toQueryTableAttributes(findTableNames(hqlStatement))); } catch (Exception e) { // If failed to parse the query, just log a warning message, but continue. - getLogger().warn("Failed to parse query: {} due to {}", new Object[]{selectQuery, e}, e); + getLogger().warn("Failed to parse query: {} due to {}", new Object[]{hqlStatement, e}, e); } // Set MIME type on output document and add extension to filename @@ -422,6 +457,9 @@ public class SelectHiveQL extends AbstractHiveQLProcessor { } else { // If there were no rows returned (and the first flow file has been sent, we're done processing, so remove the flowfile and carry on session.remove(flowfile); + if (resultSetFlowFiles != null && resultSetFlowFiles.size()>0) { + flowfile = resultSetFlowFiles.get(resultSetFlowFiles.size()-1); + } break; } @@ -443,31 +481,72 @@ public class SelectHiveQL extends AbstractHiveQLProcessor { throw e; } - session.transfer(resultSetFlowFiles, REL_SUCCESS); + failure = executeConfigStatements(con, postQueries); + if (failure != null) { + hqlStatement = failure.getLeft(); + if (resultSetFlowFiles != null) { + resultSetFlowFiles.forEach(ff -> session.remove(ff)); + } + flowfile = (fileToProcess == null) ? session.create() : fileToProcess; + fileToProcess = null; + throw failure.getRight(); + } + session.transfer(resultSetFlowFiles, REL_SUCCESS); + if (fileToProcess != null) { + session.remove(fileToProcess); + } } catch (final ProcessException | SQLException e) { - logger.error("Issue processing SQL {} due to {}.", new Object[]{selectQuery, e}); + logger.error("Issue processing SQL {} due to {}.", new Object[]{hqlStatement, e}); if (flowfile == null) { // This can happen if any exceptions occur while setting up the connection, statement, etc. logger.error("Unable to execute HiveQL select query {} due to {}. No FlowFile to route to failure", - new Object[]{selectQuery, e}); + new Object[]{hqlStatement, e}); context.yield(); } else { if (context.hasIncomingConnection()) { logger.error("Unable to execute HiveQL select query {} for {} due to {}; routing to failure", - new Object[]{selectQuery, flowfile, e}); + new Object[]{hqlStatement, flowfile, e}); flowfile = session.penalize(flowfile); } else { logger.error("Unable to execute HiveQL select query {} due to {}; routing to failure", - new Object[]{selectQuery, e}); + new Object[]{hqlStatement, e}); context.yield(); } session.transfer(flowfile, REL_FAILURE); } - } finally { - if (fileToProcess != null) { - session.remove(fileToProcess); + } + } + + /* + * Executes given queries using pre-defined connection. + * Returns null on success, or a query string if failed. + */ + protected Pair<String,SQLException> executeConfigStatements(final Connection con, final List<String> configQueries){ + if (configQueries == null || configQueries.isEmpty()) { + return null; + } + + for (String confSQL : configQueries) { + try(final Statement st = con.createStatement()){ + st.execute(confSQL); + } catch (SQLException e) { + return Pair.of(confSQL, e); + } + } + return null; + } + + protected List<String> getQueries(final String value) { + if (value == null || value.length() == 0 || value.trim().length() == 0) { + return null; + } + final List<String> queries = new LinkedList<>(); + for (String query : value.split(";")) { + if (query.trim().length() > 0) { + queries.add(query.trim()); } } + return queries; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/187417d0/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java index 43c47c8..484fa67 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java @@ -66,6 +66,8 @@ public class TestSelectHiveQL { private static final Logger LOGGER; private final static String MAX_ROWS_KEY = "maxRows"; + private final int NUM_OF_ROWS = 100; + static { System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info"); @@ -199,6 +201,51 @@ public class TestSelectHiveQL { } @Test + public void invokeOnTriggerExceptionInPreQieriesNoIncomingFlows() + throws InitializationException, ClassNotFoundException, SQLException, IOException { + + doOnTrigger(QUERY_WITHOUT_EL, false, CSV, + "select 'no exception' from persons; select exception from persons", + null); + + runner.assertAllFlowFilesTransferred(SelectHiveQL.REL_FAILURE, 1); + } + + @Test + public void invokeOnTriggerExceptionInPreQieriesWithIncomingFlows() + throws InitializationException, ClassNotFoundException, SQLException, IOException { + + doOnTrigger(QUERY_WITHOUT_EL, true, CSV, + "select 'no exception' from persons; select exception from persons", + null); + + runner.assertAllFlowFilesTransferred(SelectHiveQL.REL_FAILURE, 1); + } + + @Test + public void invokeOnTriggerExceptionInPostQieriesNoIncomingFlows() + throws InitializationException, ClassNotFoundException, SQLException, IOException { + + doOnTrigger(QUERY_WITHOUT_EL, false, CSV, + null, + "select 'no exception' from persons; select exception from persons"); + + runner.assertAllFlowFilesTransferred(SelectHiveQL.REL_FAILURE, 1); + } + + @Test + public void invokeOnTriggerExceptionInPostQieriesWithIncomingFlows() + throws InitializationException, ClassNotFoundException, SQLException, IOException { + + doOnTrigger(QUERY_WITHOUT_EL, true, CSV, + null, + "select 'no exception' from persons; select exception from persons"); + + // with incoming connections, it should be rolled back + runner.assertAllFlowFilesTransferred(SelectHiveQL.REL_FAILURE, 1); + } + + @Test public void testWithBadSQL() throws SQLException { final String BAD_SQL = "create table TEST_NO_ROWS (id integer)"; @@ -235,45 +282,45 @@ public class TestSelectHiveQL { invokeOnTrigger(QUERY_WITHOUT_EL, false, AVRO); } - public void invokeOnTrigger(final String query, final boolean incomingFlowFile, String outputFormat) + @Test + public void invokeOnTriggerWithValidPreQieries() throws InitializationException, ClassNotFoundException, SQLException, IOException { + invokeOnTrigger(QUERY_WITHOUT_EL, false, CSV, + "select '1' from persons; select '2' from persons", //should not be 'select'. But Derby driver doesn't support "set param=val" format. + null); + } - // remove previous test database, if any - final File dbLocation = new File(DB_LOCATION); - dbLocation.delete(); - - // load test data to database - final Connection con = ((HiveDBCPService) runner.getControllerService("dbcp")).getConnection(); - final Statement stmt = con.createStatement(); - try { - stmt.execute("drop table persons"); - } catch (final SQLException sqle) { - // Nothing to do here, the table didn't exist - } + @Test + public void invokeOnTriggerWithValidPostQieries() + throws InitializationException, ClassNotFoundException, SQLException, IOException { + invokeOnTrigger(QUERY_WITHOUT_EL, false, CSV, + null, + //should not be 'select'. But Derby driver doesn't support "set param=val" format, + //so just providing any "compilable" query. + " select '4' from persons; \nselect '5' from persons"); + } - stmt.execute("create table persons (id integer, name varchar(100), code integer)"); - Random rng = new Random(53496); - final int nrOfRows = 100; - stmt.executeUpdate("insert into persons values (1, 'Joe Smith', " + rng.nextInt(469947) + ")"); - for (int i = 2; i < nrOfRows; i++) { - stmt.executeUpdate("insert into persons values (" + i + ", 'Someone Else', " + rng.nextInt(469947) + ")"); - } - stmt.executeUpdate("insert into persons values (" + nrOfRows + ", 'Last Person', NULL)"); + @Test + public void invokeOnTriggerWithValidPrePostQieries() + throws InitializationException, ClassNotFoundException, SQLException, IOException { + invokeOnTrigger(QUERY_WITHOUT_EL, false, CSV, + //should not be 'select'. But Derby driver doesn't support "set param=val" format, + //so just providing any "compilable" query. + "select '1' from persons; select '2' from persons", + " select '4' from persons; \nselect '5' from persons"); + } - LOGGER.info("test data loaded"); - runner.setProperty(SelectHiveQL.HIVEQL_SELECT_QUERY, query); - runner.setProperty(HIVEQL_OUTPUT_FORMAT, outputFormat); + public void invokeOnTrigger(final String query, final boolean incomingFlowFile, String outputFormat) + throws InitializationException, ClassNotFoundException, SQLException, IOException { + invokeOnTrigger(query, incomingFlowFile, outputFormat, null, null); + } - if (incomingFlowFile) { - // incoming FlowFile content is not used, but attributes are used - final Map<String, String> attributes = new HashMap<>(); - attributes.put("person.id", "10"); - runner.enqueue("Hello".getBytes(), attributes); - } + public void invokeOnTrigger(final String query, final boolean incomingFlowFile, String outputFormat, + String preQueries, String postQueries) + throws InitializationException, ClassNotFoundException, SQLException, IOException { - runner.setIncomingConnection(incomingFlowFile); - runner.run(); + TestRunner runner = doOnTrigger(query, incomingFlowFile, outputFormat, preQueries, postQueries); runner.assertAllFlowFilesTransferred(SelectHiveQL.REL_SUCCESS, 1); final List<MockFlowFile> flowfiles = runner.getFlowFilesForRelationship(SelectHiveQL.REL_SUCCESS); @@ -306,7 +353,7 @@ public class TestSelectHiveQL { while ((line = br.readLine()) != null) { recordsFromStream++; String[] values = line.split(","); - if (recordsFromStream < (nrOfRows - 10)) { + if (recordsFromStream < (NUM_OF_ROWS - 10)) { assertEquals(3, values.length); assertTrue(values[1].startsWith("\"")); assertTrue(values[1].endsWith("\"")); @@ -315,11 +362,60 @@ public class TestSelectHiveQL { } } } - assertEquals(nrOfRows - 10, recordsFromStream); + assertEquals(NUM_OF_ROWS - 10, recordsFromStream); assertEquals(recordsFromStream, Integer.parseInt(flowFile.getAttribute(SelectHiveQL.RESULT_ROW_COUNT))); flowFile.assertAttributeEquals(AbstractHiveQLProcessor.ATTR_INPUT_TABLES, "persons"); } + public TestRunner doOnTrigger(final String query, final boolean incomingFlowFile, String outputFormat, + String preQueries, String postQueries) + throws InitializationException, ClassNotFoundException, SQLException, IOException { + + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + dbLocation.delete(); + + // load test data to database + final Connection con = ((HiveDBCPService) runner.getControllerService("dbcp")).getConnection(); + final Statement stmt = con.createStatement(); + try { + stmt.execute("drop table persons"); + } catch (final SQLException sqle) { + // Nothing to do here, the table didn't exist + } + + stmt.execute("create table persons (id integer, name varchar(100), code integer)"); + Random rng = new Random(53496); + stmt.executeUpdate("insert into persons values (1, 'Joe Smith', " + rng.nextInt(469947) + ")"); + for (int i = 2; i < NUM_OF_ROWS; i++) { + stmt.executeUpdate("insert into persons values (" + i + ", 'Someone Else', " + rng.nextInt(469947) + ")"); + } + stmt.executeUpdate("insert into persons values (" + NUM_OF_ROWS + ", 'Last Person', NULL)"); + + LOGGER.info("test data loaded"); + + runner.setProperty(SelectHiveQL.HIVEQL_SELECT_QUERY, query); + runner.setProperty(HIVEQL_OUTPUT_FORMAT, outputFormat); + if (preQueries != null) { + runner.setProperty(SelectHiveQL.HIVEQL_PRE_QUERY, preQueries); + } + if (postQueries != null) { + runner.setProperty(SelectHiveQL.HIVEQL_POST_QUERY, postQueries); + } + + if (incomingFlowFile) { + // incoming FlowFile content is not used, but attributes are used + final Map<String, String> attributes = new HashMap<>(); + attributes.put("person.id", "10"); + runner.enqueue("Hello".getBytes(), attributes); + } + + runner.setIncomingConnection(incomingFlowFile); + runner.run(); + + return runner; + } + @Test public void testMaxRowsPerFlowFileAvro() throws ClassNotFoundException, SQLException, InitializationException, IOException { @@ -556,5 +652,4 @@ public class TestSelectHiveQL { return "jdbc:derby:" + DB_LOCATION + ";create=true"; } } - -} +} \ No newline at end of file