Github user mattyb149 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1471#discussion_r142700529
--- Diff:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
---
@@ -196,44 +196,80 @@ public void process(InputStream in) throws
IOException {
selectQuery = queryContents.toString();
}
+ int resultCount=0;
try (final Connection con = dbcpService.getConnection();
final Statement st = con.createStatement()) {
st.setQueryTimeout(queryTimeout); // timeout in seconds
- final AtomicLong nrOfRows = new AtomicLong(0L);
- if (fileToProcess == null) {
- fileToProcess = session.create();
- }
- fileToProcess = session.write(fileToProcess, new
OutputStreamCallback() {
- @Override
- public void process(final OutputStream out) throws
IOException {
- try {
- logger.debug("Executing query {}", new
Object[]{selectQuery});
- final ResultSet resultSet =
st.executeQuery(selectQuery);
- final JdbcCommon.AvroConversionOptions options =
JdbcCommon.AvroConversionOptions.builder()
- .convertNames(convertNamesForAvro)
- .useLogicalTypes(useAvroLogicalTypes)
- .defaultPrecision(defaultPrecision)
- .defaultScale(defaultScale)
- .build();
-
nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, null));
- } catch (final SQLException e) {
- throw new ProcessException(e);
- }
+
+ logger.debug("Executing query {}", new Object[]{selectQuery});
+ boolean results = st.execute(selectQuery);
+
+
+ while(results){
+ FlowFile resultSetFF;
+ if(fileToProcess == null){
+ resultSetFF = session.create();
+ } else {
+ resultSetFF = session.create(fileToProcess);
+ resultSetFF = session.putAllAttributes(resultSetFF,
fileToProcess.getAttributes());
}
- });
- long duration = stopWatch.getElapsed(TimeUnit.MILLISECONDS);
+ final AtomicLong nrOfRows = new AtomicLong(0L);
+ resultSetFF = session.write(resultSetFF, new
OutputStreamCallback() {
+ @Override
+ public void process(final OutputStream out) throws
IOException {
+ try {
- // set attribute how many rows were selected
- fileToProcess = session.putAttribute(fileToProcess,
RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
- fileToProcess = session.putAttribute(fileToProcess,
RESULT_QUERY_DURATION, String.valueOf(duration));
- fileToProcess = session.putAttribute(fileToProcess,
CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
+ final ResultSet resultSet = st.getResultSet();
+ final JdbcCommon.AvroConversionOptions options
= JdbcCommon.AvroConversionOptions.builder()
+ .convertNames(convertNamesForAvro)
+ .useLogicalTypes(useAvroLogicalTypes)
+ .defaultPrecision(defaultPrecision)
+ .defaultScale(defaultScale)
+ .build();
+
nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, null));
+ } catch (final SQLException e) {
+ throw new ProcessException(e);
+ }
+ }
+ });
- logger.info("{} contains {} Avro records; transferring to
'success'",
- new Object[]{fileToProcess, nrOfRows.get()});
- session.getProvenanceReporter().modifyContent(fileToProcess,
"Retrieved " + nrOfRows.get() + " rows", duration);
- session.transfer(fileToProcess, REL_SUCCESS);
+ long duration =
stopWatch.getElapsed(TimeUnit.MILLISECONDS);
+
+ // set attribute how many rows were selected
+ resultSetFF = session.putAttribute(resultSetFF,
RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
+ resultSetFF = session.putAttribute(resultSetFF,
RESULT_QUERY_DURATION, String.valueOf(duration));
+ resultSetFF = session.putAttribute(resultSetFF,
CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
+
+ logger.info("{} contains {} Avro records; transferring to
'success'",
+ new Object[]{resultSetFF, nrOfRows.get()});
+ session.getProvenanceReporter().modifyContent(resultSetFF,
"Retrieved " + nrOfRows.get() + " rows", duration);
+ session.transfer(resultSetFF, REL_SUCCESS);
+
+ resultCount++;
+ // are there anymore result sets?
+ results = st.getMoreResults();
--- End diff --
This should probably be in a try/catch for SQLException, some (usually
younger or cross-domain) JDBC drivers may not support this method and throw an
exception here. Also I think at least one driver (can't remember which one)
closes the result set when the last row has been retrieved, in which case this
would generate a SQLException as well. But in any case, we can just set results
to false if an exception occurs and proceed from there. What do you think?
---