turcsanyip commented on a change in pull request #5550:
URL: https://github.com/apache/nifi/pull/5550#discussion_r762804091



##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
##########
@@ -323,56 +323,64 @@ public void onTrigger(final ProcessContext context, final 
ProcessSessionFactory
             }
         }
 
-        final ProcessSession session = sessionFactory.createSession();
-        final List<FlowFile> flowFiles = 
session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 250));
-        if (getLogger().isDebugEnabled()) {
-            final List<String> ids = flowFiles.stream().map(ff -> "id=" + 
ff.getId()).collect(Collectors.toList());
-            getLogger().debug("Pulled {} FlowFiles from queue: {}", new 
Object[] {ids.size(), ids});
-        }
+        while (isScheduled()) {
+            final ProcessSession session = sessionFactory.createSession();
+            final List<FlowFile> flowFiles = 
session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 250));
+            if (flowFiles.isEmpty()) {
+                break;
+            }
+            if (getLogger().isDebugEnabled()) {
+                final List<String> ids = flowFiles.stream().map(ff -> "id=" + 
ff.getId()).collect(Collectors.toList());
+                getLogger().debug("Pulled {} FlowFiles from queue: {}", 
ids.size(), ids);
+            }
 
-        final String mergeStrategy = 
context.getProperty(MERGE_STRATEGY).getValue();
-        final boolean block;
-        if (MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
-            block = true;
-        } else if (context.getProperty(CORRELATION_ATTRIBUTE_NAME).isSet()) {
-            block = true;
-        } else {
-            block = false;
-        }
+            final String mergeStrategy = 
context.getProperty(MERGE_STRATEGY).getValue();
+            final boolean block;
+            if (MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
+                block = true;
+            } else if 
(context.getProperty(CORRELATION_ATTRIBUTE_NAME).isSet()) {
+                block = true;
+            } else {
+                block = false;
+            }
 
-        try {
-            for (final FlowFile flowFile : flowFiles) {
-                try {
-                    binFlowFile(context, flowFile, session, manager, block);
-                } catch (final Exception e) {
-                    getLogger().error("Failed to bin {} due to {}", new 
Object[] {flowFile, e});
-                    session.transfer(flowFile, REL_FAILURE);
+            try {
+                for (final FlowFile flowFile : flowFiles) {
+                    try {
+                        binFlowFile(context, flowFile, session, manager, 
block);
+                    } catch (final Exception e) {
+                        getLogger().error("Failed to bin {} due to {}", 
flowFile, e);
+                        session.transfer(flowFile, REL_FAILURE);
+                    }
                 }
+            } finally {
+                session.commitAsync();
             }
-        } finally {
-            session.commitAsync();
-        }
 
-        // If there is no more data queued up, or strategy is defragment, 
complete any bin that meets our minimum threshold
-        // Otherwise, run one more cycle to process queued FlowFiles to add 
more fragment into available bins.
-        int completedBins = 0;
-        if (flowFiles.isEmpty() || 
MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
+            // Complete any bins that have reached their expiration date
             try {
-                completedBins += manager.completeFullEnoughBins();
+                manager.completeExpiredBins();
             } catch (final Exception e) {
-                getLogger().error("Failed to merge FlowFiles to create new bin 
due to " + e, e);
+                getLogger().error("Failed to merge FlowFiles to create new bin 
due to {}", e);

Review comment:
       @markap14 Thanks for catching it. Modified the log statements.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to