[
https://issues.apache.org/jira/browse/NIFI-3432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15866157#comment-15866157
]
ASF GitHub Bot commented on NIFI-3432:
--------------------------------------
Github user patricker commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1471#discussion_r101090170
--- Diff:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
---
@@ -192,31 +192,57 @@ public void process(InputStream in) throws
IOException {
try (final Connection con = dbcpService.getConnection();
final Statement st = con.createStatement()) {
st.setQueryTimeout(queryTimeout); // timeout in seconds
- final AtomicLong nrOfRows = new AtomicLong(0L);
- if (fileToProcess == null) {
- fileToProcess = session.create();
- }
- fileToProcess = session.write(fileToProcess, new
OutputStreamCallback() {
- @Override
- public void process(final OutputStream out) throws
IOException {
- try {
- logger.debug("Executing query {}", new
Object[]{selectQuery});
- final ResultSet resultSet =
st.executeQuery(selectQuery);
-
nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out,
convertNamesForAvro));
- } catch (final SQLException e) {
- throw new ProcessException(e);
- }
+
+
+ logger.debug("Executing query {}", new Object[]{selectQuery});
+ boolean results = st.execute(selectQuery);
+ int resultCount = 0;
+ while(results){
+ FlowFile resultSetFF;
+ if(fileToProcess==null)
+ resultSetFF = session.create();
+ else {
+ resultSetFF = session.create(fileToProcess);
+ resultSetFF = session.putAllAttributes(resultSetFF,
fileToProcess.getAttributes());
}
- });
- // set attribute how many rows were selected
- fileToProcess = session.putAttribute(fileToProcess,
RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
+ final AtomicLong nrOfRows = new AtomicLong(0L);
+
+ resultSetFF = session.write(resultSetFF, new
OutputStreamCallback() {
+ @Override
+ public void process(final OutputStream out) throws
IOException {
+ try {
+ ResultSet resultSet = st.getResultSet();
+
nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out,
convertNamesForAvro));
+ } catch (final SQLException e) {
+ throw new ProcessException(e);
+ }
+ }
+ });
+
+ // set attribute how many rows were selected
+ resultSetFF = session.putAttribute(resultSetFF,
RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
+
+ logger.info("{} contains {} Avro records; transferring to
'success'",
+ new Object[]{resultSetFF, nrOfRows.get()});
+ session.getProvenanceReporter().modifyContent(resultSetFF,
"Retrieved " + nrOfRows.get() + " rows",
+ stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+ session.transfer(resultSetFF, REL_SUCCESS);
+ resultCount++;
+
+ // are there anymore result sets?
+ results = st.getMoreResults();
+ }
- logger.info("{} contains {} Avro records; transferring to
'success'",
- new Object[]{fileToProcess, nrOfRows.get()});
- session.getProvenanceReporter().modifyContent(fileToProcess,
"Retrieved " + nrOfRows.get() + " rows",
- stopWatch.getElapsed(TimeUnit.MILLISECONDS));
- session.transfer(fileToProcess, REL_SUCCESS);
+ //If we had at least one result then it's OK to drop the
original file, but if we had no results then
+ // pass the original flow file down the line to trigger
downstream processors
+ if(fileToProcess != null) {
+ if (resultCount > 0) {
+ session.remove(fileToProcess);
+ } else {
+ session.transfer(fileToProcess, REL_SUCCESS);
--- End diff --
@mattyb149, Thanks for the review. In the original code `fileToProcess`
would be overwritten by the results generated by the query and always
outputted. When I put in my original PR I was always dropping the original
FlowFile, as it served no purpose. Then, I happened to execute a SQL statement
that returned no ResultSet... In this scenario the original FlowFile was
dropped and no FlowFiles were outputted. As a result I realized I needed to
keep the original FlowFile around if there were no result sets. The above
logic removes the original FlowFile under normal circumstances, `if
(resultCount > 0)`. It only keeps the original file if there are no result
sets returned by the SQL. I don't think my explanation matches up with your
observations; please let me know if this helps or if there is still something
wrong with the code that I just missed.
> ExecuteSQL Should Support Multiple ResultSets
> ---------------------------------------------
>
> Key: NIFI-3432
> URL: https://issues.apache.org/jira/browse/NIFI-3432
> Project: Apache NiFi
> Issue Type: New Feature
> Components: Core Framework
> Affects Versions: 1.2.0
> Reporter: Peter Wicks
> Assignee: Peter Wicks
> Priority: Minor
> Fix For: 1.2.0
>
>
> ExecuteSQL processor only supports processing a single resultset. If a
> query/stored procedure call returns multiple resultsets then only one is kept.
> ExecuteSQL should be updated to support handling multiple resultsets. When
> multiple resultsets exist a flow file should be created for each resultset.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)