Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1217#discussion_r91722035
  
    --- Diff: 
nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java
 ---
    @@ -135,42 +197,100 @@
             return relationships;
         }
     
    +    @OnScheduled
    +    public void setup(ProcessContext context) {
    +        // If the query is not set, then an incoming flow file is needed. 
Otherwise fail the initialization
    +        if (!context.getProperty(HIVEQL_SELECT_QUERY).isSet() && 
!context.hasIncomingConnection()) {
    +            final String errorString = "Either the Select Query must be 
specified or there must be an incoming connection "
    +                    + "providing flowfile(s) containing a SQL select 
query";
    +            getLogger().error(errorString);
    +            throw new ProcessException(errorString);
    +        }
    +    }
    +
         @Override
         public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
    -        FlowFile fileToProcess = null;
    -        if (context.hasIncomingConnection()) {
    -            fileToProcess = session.get();
    +        final FlowFile fileToProcess = (context.hasIncomingConnection()? 
session.get():null);
    +        FlowFile flowfile = null;
     
    -            // If we have no FlowFile, and all incoming connections are 
self-loops then we can continue on.
    -            // However, if we have no FlowFile and we have connections 
coming from other Processors, then
    -            // we know that we should run only if we have a FlowFile.
    +        // If we have no FlowFile, and all incoming connections are 
self-loops then we can continue on.
    +        // However, if we have no FlowFile and we have connections coming 
from other Processors, then
    +        // we know that we should run only if we have a FlowFile.
    +        if (context.hasIncomingConnection()) {
                 if (fileToProcess == null && context.hasNonLoopConnection()) {
                     return;
                 }
             }
     
             final ComponentLog logger = getLogger();
             final HiveDBCPService dbcpService = 
context.getProperty(HIVE_DBCP_SERVICE).asControllerService(HiveDBCPService.class);
    -        final String selectQuery = 
context.getProperty(HIVEQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
    +        final Charset charset = 
Charset.forName(context.getProperty(CHARSET).getValue());
    +
    +        final boolean flowbased = 
!(context.getProperty(HIVEQL_SELECT_QUERY).isSet());
    +
    +        // Source the SQL
    +        final String selectQuery;
    +
    +        if (context.getProperty(HIVEQL_SELECT_QUERY).isSet()) {
    +            selectQuery = 
context.getProperty(HIVEQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
    +        } else {
    +            // If the query is not set, then an incoming flow file is 
required, and expected to contain a valid SQL select query.
    +            // If there is no incoming connection, onTrigger will not be 
called as the processor will fail when scheduled.
    +            final StringBuilder queryContents = new StringBuilder();
    +            session.read(fileToProcess, new InputStreamCallback() {
    +                @Override
    +                public void process(InputStream in) throws IOException {
    +                    queryContents.append(IOUtils.toString(in));
    +                }
    +            });
    +            selectQuery = queryContents.toString();
    +        }
    +
    +
             final String outputFormat = 
context.getProperty(HIVEQL_OUTPUT_FORMAT).getValue();
             final StopWatch stopWatch = new StopWatch(true);
    +        final boolean header = 
context.getProperty(HIVEQL_CSV_HEADER).asBoolean();
    +        final String altHeader = 
context.getProperty(HIVEQL_CSV_ALT_HEADER).evaluateAttributeExpressions(fileToProcess).getValue();
    +        final String delimiter = 
context.getProperty(HIVEQL_CSV_DELIMITER).evaluateAttributeExpressions(fileToProcess).getValue();
    +        final boolean quote = 
context.getProperty(HIVEQL_CSV_QUOTE).asBoolean();
    +        final boolean escape = 
context.getProperty(HIVEQL_CSV_HEADER).asBoolean();
     
             try (final Connection con = dbcpService.getConnection();
    -             final Statement st = con.createStatement()) {
    +             final Statement st = ( flowbased ? 
con.prepareStatement(selectQuery): con.createStatement())
    --- End diff --
    
    Isn't it possible to specify a parameterized query in the Select Query 
property, expecting that each flow file has the appropriate attributes set? I'm 
wondering if there's a better check to know when to call prepareStatement vs 
createStatement.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to