[ 
https://issues.apache.org/jira/browse/NIFI-3031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15735425#comment-15735425
 ] 

ASF GitHub Bot commented on NIFI-3031:
--------------------------------------

Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1217#discussion_r91722035
  
    --- Diff: 
nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java
 ---
    @@ -135,42 +197,100 @@
             return relationships;
         }
     
    +    @OnScheduled
    +    public void setup(ProcessContext context) {
    +        // If the query is not set, then an incoming flow file is needed. 
Otherwise fail the initialization
    +        if (!context.getProperty(HIVEQL_SELECT_QUERY).isSet() && 
!context.hasIncomingConnection()) {
    +            final String errorString = "Either the Select Query must be 
specified or there must be an incoming connection "
    +                    + "providing flowfile(s) containing a SQL select 
query";
    +            getLogger().error(errorString);
    +            throw new ProcessException(errorString);
    +        }
    +    }
    +
         @Override
         public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
    -        FlowFile fileToProcess = null;
    -        if (context.hasIncomingConnection()) {
    -            fileToProcess = session.get();
    +        final FlowFile fileToProcess = (context.hasIncomingConnection()? 
session.get():null);
    +        FlowFile flowfile = null;
     
    -            // If we have no FlowFile, and all incoming connections are 
self-loops then we can continue on.
    -            // However, if we have no FlowFile and we have connections 
coming from other Processors, then
    -            // we know that we should run only if we have a FlowFile.
    +        // If we have no FlowFile, and all incoming connections are 
self-loops then we can continue on.
    +        // However, if we have no FlowFile and we have connections coming 
from other Processors, then
    +        // we know that we should run only if we have a FlowFile.
    +        if (context.hasIncomingConnection()) {
                 if (fileToProcess == null && context.hasNonLoopConnection()) {
                     return;
                 }
             }
     
             final ComponentLog logger = getLogger();
             final HiveDBCPService dbcpService = 
context.getProperty(HIVE_DBCP_SERVICE).asControllerService(HiveDBCPService.class);
    -        final String selectQuery = 
context.getProperty(HIVEQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
    +        final Charset charset = 
Charset.forName(context.getProperty(CHARSET).getValue());
    +
    +        final boolean flowbased = 
!(context.getProperty(HIVEQL_SELECT_QUERY).isSet());
    +
    +        // Source the SQL
    +        final String selectQuery;
    +
    +        if (context.getProperty(HIVEQL_SELECT_QUERY).isSet()) {
    +            selectQuery = 
context.getProperty(HIVEQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
    +        } else {
    +            // If the query is not set, then an incoming flow file is 
required, and expected to contain a valid SQL select query.
    +            // If there is no incoming connection, onTrigger will not be 
called as the processor will fail when scheduled.
    +            final StringBuilder queryContents = new StringBuilder();
    +            session.read(fileToProcess, new InputStreamCallback() {
    +                @Override
    +                public void process(InputStream in) throws IOException {
    +                    queryContents.append(IOUtils.toString(in));
    +                }
    +            });
    +            selectQuery = queryContents.toString();
    +        }
    +
    +
             final String outputFormat = 
context.getProperty(HIVEQL_OUTPUT_FORMAT).getValue();
             final StopWatch stopWatch = new StopWatch(true);
    +        final boolean header = 
context.getProperty(HIVEQL_CSV_HEADER).asBoolean();
    +        final String altHeader = 
context.getProperty(HIVEQL_CSV_ALT_HEADER).evaluateAttributeExpressions(fileToProcess).getValue();
    +        final String delimiter = 
context.getProperty(HIVEQL_CSV_DELIMITER).evaluateAttributeExpressions(fileToProcess).getValue();
    +        final boolean quote = 
context.getProperty(HIVEQL_CSV_QUOTE).asBoolean();
    +        final boolean escape = 
context.getProperty(HIVEQL_CSV_HEADER).asBoolean();
     
             try (final Connection con = dbcpService.getConnection();
    -             final Statement st = con.createStatement()) {
    +             final Statement st = ( flowbased ? 
con.prepareStatement(selectQuery): con.createStatement())
    --- End diff --
    
    Isn't it possible to specify a parameterized query in the Select Query 
property, expecting that each flow file has the appropriate attributes set? I'm 
wondering if there's a better check to know when to call prepareStatement vs 
createStatement.


> Support Multi-Statement Scripts in the PutHiveQL Processor
> ----------------------------------------------------------
>
>                 Key: NIFI-3031
>                 URL: https://issues.apache.org/jira/browse/NIFI-3031
>             Project: Apache NiFi
>          Issue Type: Improvement
>            Reporter: Matt Burgess
>
> Trying to use the PutHiveQL processor to execute a HiveQL script that 
> contains multiple statements.
> IE: 
> USE my_database;
> FROM my_database_src.base_table
> INSERT OVERWRITE refined_table
> SELECT *;
> -- or --
> use my_database;
> create temporary table WORKING as
> select a,b,c from RAW;
> FROM RAW
> INSERT OVERWRITE refined_table
> SELECT *;
> The current implementation doesn't even like it when you have a semicolon at 
> the end of the single statement.
> Either use a default delimiter like a semi-colon to mark the boundaries of a 
> statement within the file or allow them to define there own.
> This enables the building of pipelines that are testable by not embedding 
> HiveQL into a product; rather sourcing them from files.  And the scripts can 
> be complex.  Each statement should run in a linear manner and be part of the 
> same JDBC session to ensure things like "temporary" tables will work.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to