Repository: nifi
Updated Branches:
  refs/heads/master 9b461027a -> 187417d07


NIFI-5044 SelectHiveQL accept only one statement

SelectHiveQL support only single SELECT statement.
This change adds support for pre- and post- select statements.
It will be useful for configuration queries, i.e. "set tez.queue.name=default", 
and others.

renamed selectQuery to hqlStatement for better readability

style check correction in comment

removed meaningless examples for post-queries

execute query without result set

Signed-off-by: Matthew Burgess <mattyb...@apache.org>

This closes #2695


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/187417d0
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/187417d0
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/187417d0

Branch: refs/heads/master
Commit: 187417d0772d8c08c7efb04c7a0dfda31b20d607
Parents: 9b46102
Author: Ed B <eberezitsky@HW14163.local>
Authored: Sun May 13 01:24:09 2018 -0400
Committer: Matthew Burgess <mattyb...@apache.org>
Committed: Fri Jun 15 15:48:16 2018 -0400

----------------------------------------------------------------------
 .../nifi/processors/hive/SelectHiveQL.java      | 119 ++++++++++---
 .../nifi/processors/hive/TestSelectHiveQL.java  | 167 +++++++++++++++----
 2 files changed, 230 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/187417d0/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java
index b811278..5342c09 100644
--- 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -35,6 +36,7 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.nifi.annotation.behavior.EventDriven;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -100,10 +102,21 @@ public class SelectHiveQL extends AbstractHiveQLProcessor 
{
             .build();
     public static final Relationship REL_FAILURE = new Relationship.Builder()
             .name("failure")
-            .description("HiveQL query execution failed. Incoming FlowFile 
will be penalized and routed to this relationship")
+            .description("HiveQL query execution failed. Incoming FlowFile 
will be penalized and routed to this relationship.")
             .build();
 
 
+    public static final PropertyDescriptor HIVEQL_PRE_QUERY = new 
PropertyDescriptor.Builder()
+            .name("hive-pre-query")
+            .displayName("HiveQL Pre-Query")
+            .description("HiveQL pre-query to execute. Semicolon-delimited 
list of queries. "
+                    + "Example: 'set tez.queue.name=queue1; set 
hive.exec.orc.split.strategy=ETL; set 
hive.exec.reducers.bytes.per.reducer=1073741824'. "
+                    + "Note, the results/outputs of these queries will be 
suppressed if successfully executed.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
     public static final PropertyDescriptor HIVEQL_SELECT_QUERY = new 
PropertyDescriptor.Builder()
             .name("hive-query")
             .displayName("HiveQL Select Query")
@@ -113,6 +126,16 @@ public class SelectHiveQL extends AbstractHiveQLProcessor {
             
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .build();
 
+    public static final PropertyDescriptor HIVEQL_POST_QUERY = new 
PropertyDescriptor.Builder()
+            .name("hive-post-query")
+            .displayName("HiveQL Post-Query")
+            .description("HiveQL post-query to execute. Semicolon-delimited 
list of queries. "
+                    + "Note, the results/outputs of these queries will be 
suppressed if successfully executed.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
     public static final PropertyDescriptor FETCH_SIZE = new 
PropertyDescriptor.Builder()
             .name("hive-fetch-size")
             .displayName("Fetch Size")
@@ -214,7 +237,9 @@ public class SelectHiveQL extends AbstractHiveQLProcessor {
     static {
         List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
         _propertyDescriptors.add(HIVE_DBCP_SERVICE);
+        _propertyDescriptors.add(HIVEQL_PRE_QUERY);
         _propertyDescriptors.add(HIVEQL_SELECT_QUERY);
+        _propertyDescriptors.add(HIVEQL_POST_QUERY);
         _propertyDescriptors.add(FETCH_SIZE);
         _propertyDescriptors.add(MAX_ROWS_PER_FLOW_FILE);
         _propertyDescriptors.add(MAX_FRAGMENTS);
@@ -277,19 +302,22 @@ public class SelectHiveQL extends AbstractHiveQLProcessor 
{
         final HiveDBCPService dbcpService = 
context.getProperty(HIVE_DBCP_SERVICE).asControllerService(HiveDBCPService.class);
         final Charset charset = 
Charset.forName(context.getProperty(CHARSET).getValue());
 
+        List<String> preQueries = 
getQueries(context.getProperty(HIVEQL_PRE_QUERY).evaluateAttributeExpressions(fileToProcess).getValue());
+        List<String> postQueries = 
getQueries(context.getProperty(HIVEQL_POST_QUERY).evaluateAttributeExpressions(fileToProcess).getValue());
+
         final boolean flowbased = 
!(context.getProperty(HIVEQL_SELECT_QUERY).isSet());
 
         // Source the SQL
-        final String selectQuery;
+        String hqlStatement;
 
         if (context.getProperty(HIVEQL_SELECT_QUERY).isSet()) {
-            selectQuery = 
context.getProperty(HIVEQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
+            hqlStatement = 
context.getProperty(HIVEQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
         } else {
             // If the query is not set, then an incoming flow file is 
required, and expected to contain a valid SQL select query.
             // If there is no incoming connection, onTrigger will not be 
called as the processor will fail when scheduled.
             final StringBuilder queryContents = new StringBuilder();
             session.read(fileToProcess, in -> 
queryContents.append(IOUtils.toString(in, charset)));
-            selectQuery = queryContents.toString();
+            hqlStatement = queryContents.toString();
         }
 
 
@@ -309,9 +337,16 @@ public class SelectHiveQL extends AbstractHiveQLProcessor {
         final String fragmentIdentifier = UUID.randomUUID().toString();
 
         try (final Connection con = dbcpService.getConnection(fileToProcess == 
null ? Collections.emptyMap() : fileToProcess.getAttributes());
-             final Statement st = (flowbased ? 
con.prepareStatement(selectQuery) : con.createStatement())
+             final Statement st = (flowbased ? 
con.prepareStatement(hqlStatement) : con.createStatement())
         ) {
-
+            Pair<String,SQLException> failure = executeConfigStatements(con, 
preQueries);
+            if (failure != null) {
+                // In case of failure, assigning config query to 
"hqlStatement"  to follow current error handling
+                hqlStatement = failure.getLeft();
+                flowfile = (fileToProcess == null) ? session.create() : 
fileToProcess;
+                fileToProcess = null;
+                throw failure.getRight();
+            }
             if (fetchSize != null && fetchSize > 0) {
                 try {
                     st.setFetchSize(fetchSize);
@@ -323,14 +358,14 @@ public class SelectHiveQL extends AbstractHiveQLProcessor 
{
 
             final List<FlowFile> resultSetFlowFiles = new ArrayList<>();
             try {
-                logger.debug("Executing query {}", new Object[]{selectQuery});
+                logger.debug("Executing query {}", new Object[]{hqlStatement});
                 if (flowbased) {
                     // Hive JDBC Doesn't Support this yet:
                     // ParameterMetaData pmd = 
((PreparedStatement)st).getParameterMetaData();
                     // int paramCount = pmd.getParameterCount();
 
                     // Alternate way to determine number of params in SQL.
-                    int paramCount = StringUtils.countMatches(selectQuery, 
"?");
+                    int paramCount = StringUtils.countMatches(hqlStatement, 
"?");
 
                     if (paramCount > 0) {
                         setParameters(1, (PreparedStatement) st, paramCount, 
fileToProcess.getAttributes());
@@ -340,7 +375,7 @@ public class SelectHiveQL extends AbstractHiveQLProcessor {
                 final ResultSet resultSet;
 
                 try {
-                    resultSet = (flowbased ? ((PreparedStatement) 
st).executeQuery() : st.executeQuery(selectQuery));
+                    resultSet = (flowbased ? ((PreparedStatement) 
st).executeQuery() : st.executeQuery(hqlStatement));
                 } catch (SQLException se) {
                     // If an error occurs during the query, a flowfile is 
expected to be routed to failure, so ensure one here
                     flowfile = (fileToProcess == null) ? session.create() : 
fileToProcess;
@@ -385,10 +420,10 @@ public class SelectHiveQL extends AbstractHiveQLProcessor 
{
 
                         try {
                             // Set input/output table names by parsing the 
query
-                            
attributes.putAll(toQueryTableAttributes(findTableNames(selectQuery)));
+                            
attributes.putAll(toQueryTableAttributes(findTableNames(hqlStatement)));
                         } catch (Exception e) {
                             // If failed to parse the query, just log a 
warning message, but continue.
-                            getLogger().warn("Failed to parse query: {} due to 
{}", new Object[]{selectQuery, e}, e);
+                            getLogger().warn("Failed to parse query: {} due to 
{}", new Object[]{hqlStatement, e}, e);
                         }
 
                         // Set MIME type on output document and add extension 
to filename
@@ -422,6 +457,9 @@ public class SelectHiveQL extends AbstractHiveQLProcessor {
                     } else {
                         // If there were no rows returned (and the first flow 
file has been sent, we're done processing, so remove the flowfile and carry on
                         session.remove(flowfile);
+                        if (resultSetFlowFiles != null && 
resultSetFlowFiles.size()>0) {
+                            flowfile = 
resultSetFlowFiles.get(resultSetFlowFiles.size()-1);
+                        }
                         break;
                     }
 
@@ -443,31 +481,72 @@ public class SelectHiveQL extends AbstractHiveQLProcessor 
{
                 throw e;
             }
 
-            session.transfer(resultSetFlowFiles, REL_SUCCESS);
+            failure = executeConfigStatements(con, postQueries);
+            if (failure != null) {
+                hqlStatement = failure.getLeft();
+                if (resultSetFlowFiles != null) {
+                    resultSetFlowFiles.forEach(ff -> session.remove(ff));
+                }
+                flowfile = (fileToProcess == null) ? session.create() : 
fileToProcess;
+                fileToProcess = null;
+                throw failure.getRight();
+            }
 
+            session.transfer(resultSetFlowFiles, REL_SUCCESS);
+            if (fileToProcess != null) {
+                session.remove(fileToProcess);
+            }
         } catch (final ProcessException | SQLException e) {
-            logger.error("Issue processing SQL {} due to {}.", new 
Object[]{selectQuery, e});
+            logger.error("Issue processing SQL {} due to {}.", new 
Object[]{hqlStatement, e});
             if (flowfile == null) {
                 // This can happen if any exceptions occur while setting up 
the connection, statement, etc.
                 logger.error("Unable to execute HiveQL select query {} due to 
{}. No FlowFile to route to failure",
-                        new Object[]{selectQuery, e});
+                        new Object[]{hqlStatement, e});
                 context.yield();
             } else {
                 if (context.hasIncomingConnection()) {
                     logger.error("Unable to execute HiveQL select query {} for 
{} due to {}; routing to failure",
-                            new Object[]{selectQuery, flowfile, e});
+                            new Object[]{hqlStatement, flowfile, e});
                     flowfile = session.penalize(flowfile);
                 } else {
                     logger.error("Unable to execute HiveQL select query {} due 
to {}; routing to failure",
-                            new Object[]{selectQuery, e});
+                            new Object[]{hqlStatement, e});
                     context.yield();
                 }
                 session.transfer(flowfile, REL_FAILURE);
             }
-        } finally {
-            if (fileToProcess != null) {
-                session.remove(fileToProcess);
+        }
+    }
+
+    /*
+     * Executes given queries using pre-defined connection.
+     * Returns null on success, or a query string if failed.
+     */
+    protected Pair<String,SQLException> executeConfigStatements(final 
Connection con, final List<String> configQueries){
+        if (configQueries == null || configQueries.isEmpty()) {
+            return null;
+        }
+
+        for (String confSQL : configQueries) {
+            try(final Statement st = con.createStatement()){
+                st.execute(confSQL);
+            } catch (SQLException e) {
+                return Pair.of(confSQL, e);
+            }
+        }
+        return null;
+    }
+
+    protected List<String> getQueries(final String value) {
+        if (value == null || value.length() == 0 || value.trim().length() == 
0) {
+            return null;
+        }
+        final List<String> queries = new LinkedList<>();
+        for (String query : value.split(";")) {
+            if (query.trim().length() > 0) {
+                queries.add(query.trim());
             }
         }
+        return queries;
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/187417d0/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java
index 43c47c8..484fa67 100644
--- 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java
@@ -66,6 +66,8 @@ public class TestSelectHiveQL {
 
     private static final Logger LOGGER;
     private final static String MAX_ROWS_KEY = "maxRows";
+    private final int NUM_OF_ROWS = 100;
+
 
     static {
         System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
@@ -199,6 +201,51 @@ public class TestSelectHiveQL {
     }
 
     @Test
+    public void invokeOnTriggerExceptionInPreQieriesNoIncomingFlows()
+            throws InitializationException, ClassNotFoundException, 
SQLException, IOException {
+
+        doOnTrigger(QUERY_WITHOUT_EL, false, CSV,
+                "select 'no exception' from persons; select exception from 
persons",
+                null);
+
+        runner.assertAllFlowFilesTransferred(SelectHiveQL.REL_FAILURE, 1);
+    }
+
+    @Test
+    public void invokeOnTriggerExceptionInPreQieriesWithIncomingFlows()
+            throws InitializationException, ClassNotFoundException, 
SQLException, IOException {
+
+        doOnTrigger(QUERY_WITHOUT_EL, true, CSV,
+                "select 'no exception' from persons; select exception from 
persons",
+                null);
+
+        runner.assertAllFlowFilesTransferred(SelectHiveQL.REL_FAILURE, 1);
+    }
+
+    @Test
+    public void invokeOnTriggerExceptionInPostQieriesNoIncomingFlows()
+            throws InitializationException, ClassNotFoundException, 
SQLException, IOException {
+
+        doOnTrigger(QUERY_WITHOUT_EL, false, CSV,
+                null,
+                "select 'no exception' from persons; select exception from 
persons");
+
+        runner.assertAllFlowFilesTransferred(SelectHiveQL.REL_FAILURE, 1);
+    }
+
+    @Test
+    public void invokeOnTriggerExceptionInPostQieriesWithIncomingFlows()
+            throws InitializationException, ClassNotFoundException, 
SQLException, IOException {
+
+        doOnTrigger(QUERY_WITHOUT_EL, true, CSV,
+                null,
+                "select 'no exception' from persons; select exception from 
persons");
+
+        // with incoming connections, it should be rolled back
+        runner.assertAllFlowFilesTransferred(SelectHiveQL.REL_FAILURE, 1);
+    }
+
+    @Test
     public void testWithBadSQL() throws SQLException {
         final String BAD_SQL = "create table TEST_NO_ROWS (id integer)";
 
@@ -235,45 +282,45 @@ public class TestSelectHiveQL {
         invokeOnTrigger(QUERY_WITHOUT_EL, false, AVRO);
     }
 
-    public void invokeOnTrigger(final String query, final boolean 
incomingFlowFile, String outputFormat)
+    @Test
+    public void invokeOnTriggerWithValidPreQieries()
             throws InitializationException, ClassNotFoundException, 
SQLException, IOException {
+        invokeOnTrigger(QUERY_WITHOUT_EL, false, CSV,
+                "select '1' from persons; select '2' from persons", //should 
not be 'select'. But Derby driver doesn't support "set param=val" format.
+                null);
+    }
 
-        // remove previous test database, if any
-        final File dbLocation = new File(DB_LOCATION);
-        dbLocation.delete();
-
-        // load test data to database
-        final Connection con = ((HiveDBCPService) 
runner.getControllerService("dbcp")).getConnection();
-        final Statement stmt = con.createStatement();
-        try {
-            stmt.execute("drop table persons");
-        } catch (final SQLException sqle) {
-            // Nothing to do here, the table didn't exist
-        }
+    @Test
+    public void invokeOnTriggerWithValidPostQieries()
+            throws InitializationException, ClassNotFoundException, 
SQLException, IOException {
+        invokeOnTrigger(QUERY_WITHOUT_EL, false, CSV,
+                null,
+                    //should not be 'select'. But Derby driver doesn't support 
"set param=val" format,
+                    //so just providing any "compilable" query.
+                " select '4' from persons; \nselect '5' from persons");
+    }
 
-        stmt.execute("create table persons (id integer, name varchar(100), 
code integer)");
-        Random rng = new Random(53496);
-        final int nrOfRows = 100;
-        stmt.executeUpdate("insert into persons values (1, 'Joe Smith', " + 
rng.nextInt(469947) + ")");
-        for (int i = 2; i < nrOfRows; i++) {
-            stmt.executeUpdate("insert into persons values (" + i + ", 
'Someone Else', " + rng.nextInt(469947) + ")");
-        }
-        stmt.executeUpdate("insert into persons values (" + nrOfRows + ", 
'Last Person', NULL)");
+    @Test
+    public void invokeOnTriggerWithValidPrePostQieries()
+            throws InitializationException, ClassNotFoundException, 
SQLException, IOException {
+        invokeOnTrigger(QUERY_WITHOUT_EL, false, CSV,
+                    //should not be 'select'. But Derby driver doesn't support 
"set param=val" format,
+                    //so just providing any "compilable" query.
+                "select '1' from persons; select '2' from persons",
+                " select '4' from persons; \nselect '5' from persons");
+    }
 
-        LOGGER.info("test data loaded");
 
-        runner.setProperty(SelectHiveQL.HIVEQL_SELECT_QUERY, query);
-        runner.setProperty(HIVEQL_OUTPUT_FORMAT, outputFormat);
+    public void invokeOnTrigger(final String query, final boolean 
incomingFlowFile, String outputFormat)
+            throws InitializationException, ClassNotFoundException, 
SQLException, IOException {
+        invokeOnTrigger(query, incomingFlowFile, outputFormat, null, null);
+    }
 
-        if (incomingFlowFile) {
-            // incoming FlowFile content is not used, but attributes are used
-            final Map<String, String> attributes = new HashMap<>();
-            attributes.put("person.id", "10");
-            runner.enqueue("Hello".getBytes(), attributes);
-        }
+    public void invokeOnTrigger(final String query, final boolean 
incomingFlowFile, String outputFormat,
+            String preQueries, String postQueries)
+            throws InitializationException, ClassNotFoundException, 
SQLException, IOException {
 
-        runner.setIncomingConnection(incomingFlowFile);
-        runner.run();
+        TestRunner runner = doOnTrigger(query, incomingFlowFile, outputFormat, 
preQueries, postQueries);
         runner.assertAllFlowFilesTransferred(SelectHiveQL.REL_SUCCESS, 1);
 
         final List<MockFlowFile> flowfiles = 
runner.getFlowFilesForRelationship(SelectHiveQL.REL_SUCCESS);
@@ -306,7 +353,7 @@ public class TestSelectHiveQL {
             while ((line = br.readLine()) != null) {
                 recordsFromStream++;
                 String[] values = line.split(",");
-                if (recordsFromStream < (nrOfRows - 10)) {
+                if (recordsFromStream < (NUM_OF_ROWS - 10)) {
                     assertEquals(3, values.length);
                     assertTrue(values[1].startsWith("\""));
                     assertTrue(values[1].endsWith("\""));
@@ -315,11 +362,60 @@ public class TestSelectHiveQL {
                 }
             }
         }
-        assertEquals(nrOfRows - 10, recordsFromStream);
+        assertEquals(NUM_OF_ROWS - 10, recordsFromStream);
         assertEquals(recordsFromStream, 
Integer.parseInt(flowFile.getAttribute(SelectHiveQL.RESULT_ROW_COUNT)));
         
flowFile.assertAttributeEquals(AbstractHiveQLProcessor.ATTR_INPUT_TABLES, 
"persons");
     }
 
+    public TestRunner doOnTrigger(final String query, final boolean 
incomingFlowFile, String outputFormat,
+            String preQueries, String postQueries)
+            throws InitializationException, ClassNotFoundException, 
SQLException, IOException {
+
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        dbLocation.delete();
+
+        // load test data to database
+        final Connection con = ((HiveDBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        final Statement stmt = con.createStatement();
+        try {
+            stmt.execute("drop table persons");
+        } catch (final SQLException sqle) {
+            // Nothing to do here, the table didn't exist
+        }
+
+        stmt.execute("create table persons (id integer, name varchar(100), 
code integer)");
+        Random rng = new Random(53496);
+        stmt.executeUpdate("insert into persons values (1, 'Joe Smith', " + 
rng.nextInt(469947) + ")");
+        for (int i = 2; i < NUM_OF_ROWS; i++) {
+            stmt.executeUpdate("insert into persons values (" + i + ", 
'Someone Else', " + rng.nextInt(469947) + ")");
+        }
+        stmt.executeUpdate("insert into persons values (" + NUM_OF_ROWS + ", 
'Last Person', NULL)");
+
+        LOGGER.info("test data loaded");
+
+        runner.setProperty(SelectHiveQL.HIVEQL_SELECT_QUERY, query);
+        runner.setProperty(HIVEQL_OUTPUT_FORMAT, outputFormat);
+        if (preQueries != null) {
+            runner.setProperty(SelectHiveQL.HIVEQL_PRE_QUERY, preQueries);
+        }
+        if (postQueries != null) {
+            runner.setProperty(SelectHiveQL.HIVEQL_POST_QUERY, postQueries);
+        }
+
+        if (incomingFlowFile) {
+            // incoming FlowFile content is not used, but attributes are used
+            final Map<String, String> attributes = new HashMap<>();
+            attributes.put("person.id", "10");
+            runner.enqueue("Hello".getBytes(), attributes);
+        }
+
+        runner.setIncomingConnection(incomingFlowFile);
+        runner.run();
+
+        return runner;
+    }
+
     @Test
     public void testMaxRowsPerFlowFileAvro() throws ClassNotFoundException, 
SQLException, InitializationException, IOException {
 
@@ -556,5 +652,4 @@ public class TestSelectHiveQL {
             return "jdbc:derby:" + DB_LOCATION + ";create=true";
         }
     }
-
-}
+}
\ No newline at end of file

Reply via email to