This is an automated email from the ASF dual-hosted git repository.
mattyb149 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 2363e63c86 NIFI-13359 Tune ExecuteSQL/Record to create fewer transient
flow files
2363e63c86 is described below
commit 2363e63c86b09dac820ee848efe848816524956f
Author: Jim Steinebrey <[email protected]>
AuthorDate: Tue Jun 4 15:01:18 2024 -0400
NIFI-13359 Tune ExecuteSQL/Record to create fewer transient flow files
Signed-off-by: Matt Burgess <[email protected]>
This closes #8928
---
.../processors/standard/AbstractExecuteSQL.java | 57 +++++++++++-----------
1 file changed, 28 insertions(+), 29 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
index 57c3438ce5..104bc06ba9 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
@@ -333,11 +333,6 @@ public abstract class AbstractExecuteSQL extends
AbstractProcessor {
resultSetFF =
session.create(fileToProcess);
}
- if (inputFileAttrMap != null) {
- resultSetFF =
session.putAllAttributes(resultSetFF, inputFileAttrMap);
- }
-
-
try {
resultSetFF = session.write(resultSetFF,
out -> {
try {
@@ -347,10 +342,20 @@ public abstract class AbstractExecuteSQL extends
AbstractProcessor {
}
});
+ // if fragmented ResultSet, determine if
we should keep this fragment
+ if (maxRowsPerFlowFile > 0 &&
nrOfRows.get() == 0 && fragmentIndex > 0) {
+ // if row count is zero and this is
not the first fragment, drop it instead of committing it.
+ session.remove(resultSetFF);
+ break;
+ }
+
long fetchTimeElapsed =
fetchTime.getElapsed(TimeUnit.MILLISECONDS);
// set attributes
final Map<String, String> attributesToAdd
= new HashMap<>();
+ if (inputFileAttrMap != null) {
+
attributesToAdd.putAll(inputFileAttrMap);
+ }
attributesToAdd.put(RESULT_ROW_COUNT,
String.valueOf(nrOfRows.get()));
attributesToAdd.put(RESULT_QUERY_DURATION,
String.valueOf(executionTimeElapsed + fetchTimeElapsed));
attributesToAdd.put(RESULT_QUERY_EXECUTION_TIME,
String.valueOf(executionTimeElapsed));
@@ -359,22 +364,15 @@ public abstract class AbstractExecuteSQL extends
AbstractProcessor {
if (inputFileUUID != null) {
attributesToAdd.put(INPUT_FLOWFILE_UUID, inputFileUUID);
}
+ if (maxRowsPerFlowFile > 0) {
+ // if fragmented ResultSet, set
fragment attributes
+ attributesToAdd.put(FRAGMENT_ID,
fragmentId);
+ attributesToAdd.put(FRAGMENT_INDEX,
String.valueOf(fragmentIndex));
+ }
attributesToAdd.putAll(sqlWriter.getAttributesToAdd());
resultSetFF =
session.putAllAttributes(resultSetFF, attributesToAdd);
sqlWriter.updateCounters(session);
- // if fragmented ResultSet, determine if
we should keep this fragment; set fragment attributes
- if (maxRowsPerFlowFile > 0) {
- // if row count is zero and this is
not the first fragment, drop it instead of committing it.
- if (nrOfRows.get() == 0 &&
fragmentIndex > 0) {
- session.remove(resultSetFF);
- break;
- }
-
- resultSetFF =
session.putAttribute(resultSetFF, FRAGMENT_ID, fragmentId);
- resultSetFF =
session.putAttribute(resultSetFF, FRAGMENT_INDEX,
String.valueOf(fragmentIndex));
- }
-
logger.info("{} contains {} records;
transferring to 'success'", resultSetFF, nrOfRows.get());
// Report a FETCH event if there was an
incoming flow file, or a RECEIVE event otherwise
@@ -452,26 +450,19 @@ public abstract class AbstractExecuteSQL extends
AbstractProcessor {
session.transfer(resultSetFlowFiles, REL_SUCCESS);
resultSetFlowFiles.clear();
- //If we had at least one result then it's OK to drop the
original file, but if we had no results then
- // pass the original flow file down the line to trigger
downstream processors
if (fileToProcess != null) {
if (resultCount > 0) {
+ // If we had at least one result then it's OK to drop
the original file
session.remove(fileToProcess);
} else {
- fileToProcess = session.write(fileToProcess, out ->
sqlWriter.writeEmptyResultSet(out, getLogger()));
- fileToProcess = session.putAttribute(fileToProcess,
RESULT_ROW_COUNT, "0");
- fileToProcess = session.putAttribute(fileToProcess,
CoreAttributes.MIME_TYPE.key(), sqlWriter.getMimeType());
- session.transfer(fileToProcess, REL_SUCCESS);
+ // If we had no results then transfer the original
flow file downstream to trigger processors
+ session.transfer(setFlowFileEmptyResults(session,
fileToProcess, sqlWriter), REL_SUCCESS);
}
} else if (resultCount == 0) {
- //If we had no inbound FlowFile, no exceptions, and the
SQL generated no result sets (Insert/Update/Delete statements only)
+ // If we had no inbound FlowFile, no exceptions, and the
SQL generated no result sets (Insert/Update/Delete statements only)
// Then generate an empty Output FlowFile
FlowFile resultSetFF = session.create();
-
- resultSetFF = session.write(resultSetFF, out ->
sqlWriter.writeEmptyResultSet(out, getLogger()));
- resultSetFF = session.putAttribute(resultSetFF,
RESULT_ROW_COUNT, "0");
- resultSetFF = session.putAttribute(resultSetFF,
CoreAttributes.MIME_TYPE.key(), sqlWriter.getMimeType());
- session.transfer(resultSetFF, REL_SUCCESS);
+ session.transfer(setFlowFileEmptyResults(session,
resultSetFF, sqlWriter), REL_SUCCESS);
}
}
} catch (final ProcessException | SQLException e) {
@@ -495,6 +486,14 @@ public abstract class AbstractExecuteSQL extends
AbstractProcessor {
}
}
+ protected FlowFile setFlowFileEmptyResults(final ProcessSession session,
FlowFile flowFile, SqlWriter sqlWriter) {
+ flowFile = session.write(flowFile, out ->
sqlWriter.writeEmptyResultSet(out, getLogger()));
+ final Map<String, String> attributesToAdd = new HashMap<>();
+ attributesToAdd.put(RESULT_ROW_COUNT, "0");
+ attributesToAdd.put(CoreAttributes.MIME_TYPE.key(),
sqlWriter.getMimeType());
+ return session.putAllAttributes(flowFile, attributesToAdd);
+ }
+
/*
* Executes given queries using pre-defined connection.
* Returns null on success, or a query string if failed.