This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 2363e63c86 NIFI-13359 Tune ExecuteSQL/Record to create fewer transient 
flow files
2363e63c86 is described below

commit 2363e63c86b09dac820ee848efe848816524956f
Author: Jim Steinebrey <[email protected]>
AuthorDate: Tue Jun 4 15:01:18 2024 -0400

    NIFI-13359 Tune ExecuteSQL/Record to create fewer transient flow files
    
    Signed-off-by: Matt Burgess <[email protected]>
    
    This closes #8928
---
 .../processors/standard/AbstractExecuteSQL.java    | 57 +++++++++++-----------
 1 file changed, 28 insertions(+), 29 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
index 57c3438ce5..104bc06ba9 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
@@ -333,11 +333,6 @@ public abstract class AbstractExecuteSQL extends 
AbstractProcessor {
                                     resultSetFF = 
session.create(fileToProcess);
                                 }
 
-                                if (inputFileAttrMap != null) {
-                                    resultSetFF = 
session.putAllAttributes(resultSetFF, inputFileAttrMap);
-                                }
-
-
                                 try {
                                     resultSetFF = session.write(resultSetFF, 
out -> {
                                         try {
@@ -347,10 +342,20 @@ public abstract class AbstractExecuteSQL extends 
AbstractProcessor {
                                         }
                                     });
 
+                                    // if fragmented ResultSet, determine if 
we should keep this fragment
+                                    if (maxRowsPerFlowFile > 0 && 
nrOfRows.get() == 0 && fragmentIndex > 0) {
+                                        // if row count is zero and this is 
not the first fragment, drop it instead of committing it.
+                                        session.remove(resultSetFF);
+                                        break;
+                                    }
+
                                     long fetchTimeElapsed = 
fetchTime.getElapsed(TimeUnit.MILLISECONDS);
 
                                     // set attributes
                                     final Map<String, String> attributesToAdd 
= new HashMap<>();
+                                    if (inputFileAttrMap != null) {
+                                        
attributesToAdd.putAll(inputFileAttrMap);
+                                    }
                                     attributesToAdd.put(RESULT_ROW_COUNT, 
String.valueOf(nrOfRows.get()));
                                     attributesToAdd.put(RESULT_QUERY_DURATION, 
String.valueOf(executionTimeElapsed + fetchTimeElapsed));
                                     
attributesToAdd.put(RESULT_QUERY_EXECUTION_TIME, 
String.valueOf(executionTimeElapsed));
@@ -359,22 +364,15 @@ public abstract class AbstractExecuteSQL extends 
AbstractProcessor {
                                     if (inputFileUUID != null) {
                                         
attributesToAdd.put(INPUT_FLOWFILE_UUID, inputFileUUID);
                                     }
+                                    if (maxRowsPerFlowFile > 0) {
+                                        // if fragmented ResultSet, set 
fragment attributes
+                                        attributesToAdd.put(FRAGMENT_ID, 
fragmentId);
+                                        attributesToAdd.put(FRAGMENT_INDEX, 
String.valueOf(fragmentIndex));
+                                    }
                                     
attributesToAdd.putAll(sqlWriter.getAttributesToAdd());
                                     resultSetFF = 
session.putAllAttributes(resultSetFF, attributesToAdd);
                                     sqlWriter.updateCounters(session);
 
-                                    // if fragmented ResultSet, determine if 
we should keep this fragment; set fragment attributes
-                                    if (maxRowsPerFlowFile > 0) {
-                                        // if row count is zero and this is 
not the first fragment, drop it instead of committing it.
-                                        if (nrOfRows.get() == 0 && 
fragmentIndex > 0) {
-                                            session.remove(resultSetFF);
-                                            break;
-                                        }
-
-                                        resultSetFF = 
session.putAttribute(resultSetFF, FRAGMENT_ID, fragmentId);
-                                        resultSetFF = 
session.putAttribute(resultSetFF, FRAGMENT_INDEX, 
String.valueOf(fragmentIndex));
-                                    }
-
                                     logger.info("{} contains {} records; 
transferring to 'success'", resultSetFF, nrOfRows.get());
 
                                     // Report a FETCH event if there was an 
incoming flow file, or a RECEIVE event otherwise
@@ -452,26 +450,19 @@ public abstract class AbstractExecuteSQL extends 
AbstractProcessor {
                 session.transfer(resultSetFlowFiles, REL_SUCCESS);
                 resultSetFlowFiles.clear();
 
-                //If we had at least one result then it's OK to drop the 
original file, but if we had no results then
-                //  pass the original flow file down the line to trigger 
downstream processors
                 if (fileToProcess != null) {
                     if (resultCount > 0) {
+                        // If we had at least one result then it's OK to drop 
the original file
                         session.remove(fileToProcess);
                     } else {
-                        fileToProcess = session.write(fileToProcess, out -> 
sqlWriter.writeEmptyResultSet(out, getLogger()));
-                        fileToProcess = session.putAttribute(fileToProcess, 
RESULT_ROW_COUNT, "0");
-                        fileToProcess = session.putAttribute(fileToProcess, 
CoreAttributes.MIME_TYPE.key(), sqlWriter.getMimeType());
-                        session.transfer(fileToProcess, REL_SUCCESS);
+                        // If we had no results then transfer the original 
flow file downstream to trigger processors
+                        session.transfer(setFlowFileEmptyResults(session, 
fileToProcess, sqlWriter), REL_SUCCESS);
                     }
                 } else if (resultCount == 0) {
-                    //If we had no inbound FlowFile, no exceptions, and the 
SQL generated no result sets (Insert/Update/Delete statements only)
+                    // If we had no inbound FlowFile, no exceptions, and the 
SQL generated no result sets (Insert/Update/Delete statements only)
                     // Then generate an empty Output FlowFile
                     FlowFile resultSetFF = session.create();
-
-                    resultSetFF = session.write(resultSetFF, out -> 
sqlWriter.writeEmptyResultSet(out, getLogger()));
-                    resultSetFF = session.putAttribute(resultSetFF, 
RESULT_ROW_COUNT, "0");
-                    resultSetFF = session.putAttribute(resultSetFF, 
CoreAttributes.MIME_TYPE.key(), sqlWriter.getMimeType());
-                    session.transfer(resultSetFF, REL_SUCCESS);
+                    session.transfer(setFlowFileEmptyResults(session, 
resultSetFF, sqlWriter), REL_SUCCESS);
                 }
             }
         } catch (final ProcessException | SQLException e) {
@@ -495,6 +486,14 @@ public abstract class AbstractExecuteSQL extends 
AbstractProcessor {
         }
     }
 
+    protected FlowFile setFlowFileEmptyResults(final ProcessSession session, 
FlowFile flowFile, SqlWriter sqlWriter) {
+        flowFile = session.write(flowFile, out -> 
sqlWriter.writeEmptyResultSet(out, getLogger()));
+        final Map<String, String> attributesToAdd = new HashMap<>();
+        attributesToAdd.put(RESULT_ROW_COUNT, "0");
+        attributesToAdd.put(CoreAttributes.MIME_TYPE.key(), 
sqlWriter.getMimeType());
+        return session.putAllAttributes(flowFile, attributesToAdd);
+    }
+
     /*
      * Executes given queries using pre-defined connection.
      * Returns null on success, or a query string if failed.

Reply via email to