This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 979d168323 NIFI-15284 Added debug logging to Wait and Notify 
Processors (#10592)
979d168323 is described below

commit 979d1683239929d8102bc78b5e366f1f86458b98
Author: Pierre Villard <[email protected]>
AuthorDate: Tue Dec 2 16:53:28 2025 +0100

    NIFI-15284 Added debug logging to Wait and Notify Processors (#10592)
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../apache/nifi/processors/standard/Notify.java    | 16 ++++++-
 .../org/apache/nifi/processors/standard/Wait.java  | 55 +++++++++++++++++++++-
 2 files changed, 69 insertions(+), 2 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java
index 4e56587f8f..ac803e9cac 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java
@@ -227,6 +227,11 @@ public class Notify extends AbstractProcessor {
                 }
             }
 
+            if (logger.isDebugEnabled()) {
+                logger.debug("Evaluated FlowFile {} to signalId='{}' 
counterName='{}' delta={} bufferCount={} attrRegex='{}'",
+                        flowFile, signalId, counterName, delta, bufferCount, 
attributeCacheRegex);
+            }
+
             if (!signalBuffers.containsKey(signalId)) {
                 signalBuffers.put(signalId, new SignalBuffer());
             }
@@ -236,6 +241,9 @@ public class Notify extends AbstractProcessor {
                 flowFile.getAttributes().entrySet()
                         .stream().filter(e -> (!e.getKey().equals("uuid") && 
e.getKey().matches(attributeCacheRegex)))
                         .forEach(e -> 
signalBuffer.attributesToCache.put(e.getKey(), e.getValue()));
+                if (logger.isDebugEnabled()) {
+                    logger.debug("FlowFile {} added cached attributes {} for 
signalId='{}'", flowFile, signalBuffer.attributesToCache.keySet(), signalId);
+                }
             }
 
             signalBuffer.incrementDelta(counterName, delta);
@@ -250,7 +258,13 @@ public class Notify extends AbstractProcessor {
             // In case of Exception, just throw the exception so that 
processor can
             // retry after yielding for a while.
             try {
-                protocol.notify(signalId, signalBuffer.deltas, 
signalBuffer.attributesToCache);
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Notifying signalId='{}' with deltas {} and 
cached attributes {}", signalId, signalBuffer.deltas, 
signalBuffer.attributesToCache);
+                }
+                final WaitNotifyProtocol.Signal signal = 
protocol.notify(signalId, signalBuffer.deltas, signalBuffer.attributesToCache);
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Post-notify state for signalId='{}': 
counts={} attributesKeys={}", signalId, signal.getCounts(), 
signal.getAttributes().keySet());
+                }
                 signalBuffer.flowFiles.forEach(flowFile ->
                         session.transfer(session.putAttribute(flowFile, 
NOTIFIED_ATTRIBUTE_NAME, String.valueOf(true)), REL_SUCCESS));
             } catch (IOException e) {
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
index 4c64f1b24c..f653f31f3a 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
@@ -324,6 +324,9 @@ public class Wait extends AbstractProcessor {
 
             if (signalIdPenalties.containsKey(fSignalId)) {
                 // This id is penalized.
+                if (logger.isDebugEnabled()) {
+                    logger.debug("FlowFile {} with signalId='{}' is currently 
penalized until {}", f, fSignalId, signalIdPenalties.get(fSignalId));
+                }
                 return REJECT_AND_CONTINUE;
             }
 
@@ -331,6 +334,9 @@ public class Wait extends AbstractProcessor {
             if (targetSignalIdStr == null) {
                 // This is the first one.
                 targetSignalId.set(fSignalId);
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Selecting signalId='{}' for this trigger 
batch", fSignalId);
+                }
                 return acceptResultSupplier.get();
             }
 
@@ -338,6 +344,9 @@ public class Wait extends AbstractProcessor {
                 return acceptResultSupplier.get();
             }
 
+            if (logger.isDebugEnabled()) {
+                logger.debug("FlowFile {} uses different signalId='{}' and is 
skipped for this trigger batch targeting '{}'", f, fSignalId, 
targetSignalIdStr);
+            }
             return REJECT_AND_CONTINUE;
 
         });
@@ -393,6 +402,10 @@ public class Wait extends AbstractProcessor {
             return;
         }
 
+        if (logger.isDebugEnabled()) {
+            logger.debug("Processing {} FlowFile(s) for signalId='{}'", 
flowFiles.size(), targetSignalId.get());
+        }
+
         // the cache client used to interact with the distributed cache
         final AtomicDistributedMapCacheClient cache = 
context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(AtomicDistributedMapCacheClient.class);
         final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache);
@@ -403,6 +416,14 @@ public class Wait extends AbstractProcessor {
         // get notifying signal
         try {
             signal = protocol.getSignal(signalId);
+            if (logger.isDebugEnabled()) {
+                if (signal == null) {
+                    logger.debug("No signal found in cache for signalId='{}'", 
signalId);
+                } else {
+                    logger.debug("Fetched signal for signalId='{}': counts={} 
attributesKeys={} releasableCount={}",
+                            signalId, signal.getCounts(), 
signal.getAttributes().keySet(), signal.getReleasableCount());
+                }
+            }
             if (signal != null) {
                 originalSignalCounts.putAll(signal.getCounts());
             }
@@ -470,6 +491,10 @@ public class Wait extends AbstractProcessor {
                     logger.error("Failed to parse releasableFlowFileCount when 
processing {} due to {}", flowFile, e, e);
                     continue;
                 }
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Using targetCounterName='{}' targetCount={} 
releasableFlowFileCount={} for signalId='{}'",
+                            targetCounterName, targetCount, 
releasableFlowFileCount, signalId);
+                }
             }
 
             // FlowFile is now validated and added to candidates.
@@ -481,11 +506,20 @@ public class Wait extends AbstractProcessor {
         if (signal != null && !candidates.isEmpty()) {
 
             if (releasableFlowFileCount > 0) {
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Evaluating releaseCandidates for 
signalId='{}' counts={} targetCounterName='{}' targetCount={} 
releasableFlowFileCount={} existingReleasableCount={} candidates={}",
+                            signalId, signal.getCounts(), targetCounterName, 
targetCount, releasableFlowFileCount, signal.getReleasableCount(), 
candidates.size());
+                }
                 signal.releaseCandidates(targetCounterName, targetCount, 
releasableFlowFileCount, candidates,
                         released -> 
getFlowFilesFor.apply(REL_SUCCESS).addAll(released),
                         waiting -> 
getFlowFilesFor.apply(REL_WAIT).addAll(waiting));
                 waitCompleted = signal.getTotalCount() == 0 && 
signal.getReleasableCount() == 0;
                 waitProgressed = !getFlowFilesFor.apply(REL_SUCCESS).isEmpty();
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Release evaluation result for signalId='{}': 
released={} waiting={} remainingCounts={} remainingReleasable={}",
+                            signalId, 
getFlowFilesFor.apply(REL_SUCCESS).size(), 
getFlowFilesFor.apply(REL_WAIT).size(),
+                            signal.getCounts(), signal.getReleasableCount());
+                }
 
             } else {
                 boolean reachedTargetCount = 
StringUtils.isBlank(targetCounterName)
@@ -497,24 +531,43 @@ public class Wait extends AbstractProcessor {
                 } else {
                     getFlowFilesFor.apply(REL_WAIT).addAll(candidates);
                 }
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Zero-release-count mode for signalId='{}': 
reachedTargetCount={} candidates={} routedToSuccess={} routedToWait={}",
+                            signalId, reachedTargetCount, candidates.size(), 
reachedTargetCount ? candidates.size() : 0, reachedTargetCount ? 0 : 
candidates.size());
+                }
             }
         }
 
         // Transfer FlowFiles.
         processedFlowFiles.entrySet().forEach(transferFlowFiles);
+        if (logger.isDebugEnabled() && !processedFlowFiles.isEmpty()) {
+            processedFlowFiles.forEach((rel, list) ->
+                    logger.debug("Routing {} FlowFile(s) to relationship '{}' 
for signalId='{}'", list.size(), rel.getName(), signalId));
+        }
 
         // Penalize signal id if no FlowFile transferred to success.
         final PropertyValue waitPenaltyDuration = 
context.getProperty(WAIT_PENALTY_DURATION);
         if (waitPenaltyDuration.isSet() && 
getFlowFilesFor.apply(REL_SUCCESS).isEmpty()) {
-            signalIdPenalties.put(signalId, System.currentTimeMillis() + 
waitPenaltyDuration.asTimePeriod(TimeUnit.MILLISECONDS));
+            final long penaltyUntil = System.currentTimeMillis() + 
waitPenaltyDuration.asTimePeriod(TimeUnit.MILLISECONDS);
+            signalIdPenalties.put(signalId, penaltyUntil);
+            if (logger.isDebugEnabled()) {
+                logger.debug("Penalizing signalId='{}' until {} because no 
FlowFile was released this trigger", signalId, penaltyUntil);
+            }
         }
 
         // Update signal if needed.
         try {
             if (waitCompleted) {
                 protocol.complete(signalId);
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Completed wait for signalId='{}' and removed 
signal from cache", signalId);
+                }
             } else if (waitProgressed) {
                 protocol.replace(signal);
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Updated signal in cache for signalId='{}' 
with counts={} releasableCount={}",
+                            signalId, signal.getCounts(), 
signal.getReleasableCount());
+                }
             }
 
         } catch (final IOException e) {

Reply via email to