[
https://issues.apache.org/jira/browse/NIFI-3432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15866388#comment-15866388
]
ASF GitHub Bot commented on NIFI-3432:
--------------------------------------
Github user patricker commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1471#discussion_r101112040
--- Diff:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
---
@@ -192,31 +192,57 @@ public void process(InputStream in) throws
IOException {
try (final Connection con = dbcpService.getConnection();
final Statement st = con.createStatement()) {
st.setQueryTimeout(queryTimeout); // timeout in seconds
- final AtomicLong nrOfRows = new AtomicLong(0L);
- if (fileToProcess == null) {
- fileToProcess = session.create();
- }
- fileToProcess = session.write(fileToProcess, new
OutputStreamCallback() {
- @Override
- public void process(final OutputStream out) throws
IOException {
- try {
- logger.debug("Executing query {}", new
Object[]{selectQuery});
- final ResultSet resultSet =
st.executeQuery(selectQuery);
-
nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out,
convertNamesForAvro));
- } catch (final SQLException e) {
- throw new ProcessException(e);
- }
+
+
+ logger.debug("Executing query {}", new Object[]{selectQuery});
+ boolean results = st.execute(selectQuery);
+ int resultCount = 0;
+ while(results){
+ FlowFile resultSetFF;
+ if(fileToProcess==null)
+ resultSetFF = session.create();
+ else {
+ resultSetFF = session.create(fileToProcess);
+ resultSetFF = session.putAllAttributes(resultSetFF,
fileToProcess.getAttributes());
}
- });
- // set attribute how many rows were selected
- fileToProcess = session.putAttribute(fileToProcess,
RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
+ final AtomicLong nrOfRows = new AtomicLong(0L);
+
+ resultSetFF = session.write(resultSetFF, new
OutputStreamCallback() {
+ @Override
+ public void process(final OutputStream out) throws
IOException {
+ try {
+ ResultSet resultSet = st.getResultSet();
+
nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out,
convertNamesForAvro));
+ } catch (final SQLException e) {
+ throw new ProcessException(e);
+ }
+ }
+ });
+
+ // set attribute how many rows were selected
+ resultSetFF = session.putAttribute(resultSetFF,
RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
+
+ logger.info("{} contains {} Avro records; transferring to
'success'",
+ new Object[]{resultSetFF, nrOfRows.get()});
+ session.getProvenanceReporter().modifyContent(resultSetFF,
"Retrieved " + nrOfRows.get() + " rows",
+ stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+ session.transfer(resultSetFF, REL_SUCCESS);
+ resultCount++;
+
+ // are there anymore result sets?
+ results = st.getMoreResults();
+ }
- logger.info("{} contains {} Avro records; transferring to
'success'",
- new Object[]{fileToProcess, nrOfRows.get()});
- session.getProvenanceReporter().modifyContent(fileToProcess,
"Retrieved " + nrOfRows.get() + " rows",
- stopWatch.getElapsed(TimeUnit.MILLISECONDS));
- session.transfer(fileToProcess, REL_SUCCESS);
+ //If we had at least one result then it's OK to drop the
original file, but if we had no results then
+ // pass the original flow file down the line to trigger
downstream processors
+ if(fileToProcess != null) {
+ if (resultCount > 0) {
+ session.remove(fileToProcess);
+ } else {
+ session.transfer(fileToProcess, REL_SUCCESS);
--- End diff --
I went out and ran the original code base. A query that returns no results
causes an empty Avro schema. I replicated this in my code, and built a test
case.
> ExecuteSQL Should Support Multiple ResultSets
> ---------------------------------------------
>
> Key: NIFI-3432
> URL: https://issues.apache.org/jira/browse/NIFI-3432
> Project: Apache NiFi
> Issue Type: New Feature
> Components: Core Framework
> Affects Versions: 1.2.0
> Reporter: Peter Wicks
> Assignee: Peter Wicks
> Priority: Minor
> Fix For: 1.2.0
>
>
> ExecuteSQL processor only supports processing a single resultset. If a
> query/stored procedure call returns multiple resultsets then only one is kept.
> ExecuteSQL should be updated to support handling multiple resultsets. When
> multiple resultsets exist a flow file should be created for each resultset.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)