[
https://issues.apache.org/jira/browse/NIFI-3031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15735425#comment-15735425
]
ASF GitHub Bot commented on NIFI-3031:
--------------------------------------
Github user mattyb149 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1217#discussion_r91722035
--- Diff:
nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java
---
@@ -135,42 +197,100 @@
return relationships;
}
+ @OnScheduled
+ public void setup(ProcessContext context) {
+ // If the query is not set, then an incoming flow file is needed.
Otherwise fail the initialization
+ if (!context.getProperty(HIVEQL_SELECT_QUERY).isSet() &&
!context.hasIncomingConnection()) {
+ final String errorString = "Either the Select Query must be
specified or there must be an incoming connection "
+ + "providing flowfile(s) containing a SQL select
query";
+ getLogger().error(errorString);
+ throw new ProcessException(errorString);
+ }
+ }
+
@Override
public void onTrigger(final ProcessContext context, final
ProcessSession session) throws ProcessException {
- FlowFile fileToProcess = null;
- if (context.hasIncomingConnection()) {
- fileToProcess = session.get();
+ final FlowFile fileToProcess = (context.hasIncomingConnection()?
session.get():null);
+ FlowFile flowfile = null;
- // If we have no FlowFile, and all incoming connections are
self-loops then we can continue on.
- // However, if we have no FlowFile and we have connections
coming from other Processors, then
- // we know that we should run only if we have a FlowFile.
+ // If we have no FlowFile, and all incoming connections are
self-loops then we can continue on.
+ // However, if we have no FlowFile and we have connections coming
from other Processors, then
+ // we know that we should run only if we have a FlowFile.
+ if (context.hasIncomingConnection()) {
if (fileToProcess == null && context.hasNonLoopConnection()) {
return;
}
}
final ComponentLog logger = getLogger();
final HiveDBCPService dbcpService =
context.getProperty(HIVE_DBCP_SERVICE).asControllerService(HiveDBCPService.class);
- final String selectQuery =
context.getProperty(HIVEQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
+ final Charset charset =
Charset.forName(context.getProperty(CHARSET).getValue());
+
+ final boolean flowbased =
!(context.getProperty(HIVEQL_SELECT_QUERY).isSet());
+
+ // Source the SQL
+ final String selectQuery;
+
+ if (context.getProperty(HIVEQL_SELECT_QUERY).isSet()) {
+ selectQuery =
context.getProperty(HIVEQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
+ } else {
+ // If the query is not set, then an incoming flow file is
required, and expected to contain a valid SQL select query.
+ // If there is no incoming connection, onTrigger will not be
called as the processor will fail when scheduled.
+ final StringBuilder queryContents = new StringBuilder();
+ session.read(fileToProcess, new InputStreamCallback() {
+ @Override
+ public void process(InputStream in) throws IOException {
+ queryContents.append(IOUtils.toString(in));
+ }
+ });
+ selectQuery = queryContents.toString();
+ }
+
+
final String outputFormat =
context.getProperty(HIVEQL_OUTPUT_FORMAT).getValue();
final StopWatch stopWatch = new StopWatch(true);
+ final boolean header =
context.getProperty(HIVEQL_CSV_HEADER).asBoolean();
+ final String altHeader =
context.getProperty(HIVEQL_CSV_ALT_HEADER).evaluateAttributeExpressions(fileToProcess).getValue();
+ final String delimiter =
context.getProperty(HIVEQL_CSV_DELIMITER).evaluateAttributeExpressions(fileToProcess).getValue();
+ final boolean quote =
context.getProperty(HIVEQL_CSV_QUOTE).asBoolean();
+ final boolean escape =
context.getProperty(HIVEQL_CSV_HEADER).asBoolean();
try (final Connection con = dbcpService.getConnection();
- final Statement st = con.createStatement()) {
+ final Statement st = ( flowbased ?
con.prepareStatement(selectQuery): con.createStatement())
--- End diff --
Isn't it possible to specify a parameterized query in the Select Query
property, expecting that each flow file has the appropriate attributes set? I'm
wondering if there's a better check to know when to call prepareStatement vs
createStatement.
> Support Multi-Statement Scripts in the PutHiveQL Processor
> ----------------------------------------------------------
>
> Key: NIFI-3031
> URL: https://issues.apache.org/jira/browse/NIFI-3031
> Project: Apache NiFi
> Issue Type: Improvement
> Reporter: Matt Burgess
>
> Trying to use the PutHiveQL processor to execute a HiveQL script that
> contains multiple statements.
> IE:
> USE my_database;
> FROM my_database_src.base_table
> INSERT OVERWRITE refined_table
> SELECT *;
> -- or --
> use my_database;
> create temporary table WORKING as
> select a,b,c from RAW;
> FROM RAW
> INSERT OVERWRITE refined_table
> SELECT *;
> The current implementation doesn't even like it when you have a semicolon at
> the end of the single statement.
> Either use a default delimiter like a semi-colon to mark the boundaries of a
> statement within the file or allow them to define there own.
> This enables the building of pipelines that are testable by not embedding
> HiveQL into a product; rather sourcing them from files. And the scripts can
> be complex. Each statement should run in a linear manner and be part of the
> same JDBC session to ensure things like "temporary" tables will work.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)