This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 979d168323 NIFI-15284 Added debug logging to Wait and Notify
Processors (#10592)
979d168323 is described below
commit 979d1683239929d8102bc78b5e366f1f86458b98
Author: Pierre Villard <[email protected]>
AuthorDate: Tue Dec 2 16:53:28 2025 +0100
NIFI-15284 Added debug logging to Wait and Notify Processors (#10592)
Signed-off-by: David Handermann <[email protected]>
---
.../apache/nifi/processors/standard/Notify.java | 16 ++++++-
.../org/apache/nifi/processors/standard/Wait.java | 55 +++++++++++++++++++++-
2 files changed, 69 insertions(+), 2 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java
index 4e56587f8f..ac803e9cac 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java
@@ -227,6 +227,11 @@ public class Notify extends AbstractProcessor {
}
}
+ if (logger.isDebugEnabled()) {
+ logger.debug("Evaluated FlowFile {} to signalId='{}'
counterName='{}' delta={} bufferCount={} attrRegex='{}'",
+ flowFile, signalId, counterName, delta, bufferCount,
attributeCacheRegex);
+ }
+
if (!signalBuffers.containsKey(signalId)) {
signalBuffers.put(signalId, new SignalBuffer());
}
@@ -236,6 +241,9 @@ public class Notify extends AbstractProcessor {
flowFile.getAttributes().entrySet()
.stream().filter(e -> (!e.getKey().equals("uuid") &&
e.getKey().matches(attributeCacheRegex)))
.forEach(e ->
signalBuffer.attributesToCache.put(e.getKey(), e.getValue()));
+ if (logger.isDebugEnabled()) {
+ logger.debug("FlowFile {} added cached attributes {} for
signalId='{}'", flowFile, signalBuffer.attributesToCache.keySet(), signalId);
+ }
}
signalBuffer.incrementDelta(counterName, delta);
@@ -250,7 +258,13 @@ public class Notify extends AbstractProcessor {
// In case of Exception, just throw the exception so that
processor can
// retry after yielding for a while.
try {
- protocol.notify(signalId, signalBuffer.deltas,
signalBuffer.attributesToCache);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Notifying signalId='{}' with deltas {} and
cached attributes {}", signalId, signalBuffer.deltas,
signalBuffer.attributesToCache);
+ }
+ final WaitNotifyProtocol.Signal signal =
protocol.notify(signalId, signalBuffer.deltas, signalBuffer.attributesToCache);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Post-notify state for signalId='{}':
counts={} attributesKeys={}", signalId, signal.getCounts(),
signal.getAttributes().keySet());
+ }
signalBuffer.flowFiles.forEach(flowFile ->
session.transfer(session.putAttribute(flowFile,
NOTIFIED_ATTRIBUTE_NAME, String.valueOf(true)), REL_SUCCESS));
} catch (IOException e) {
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
index 4c64f1b24c..f653f31f3a 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
@@ -324,6 +324,9 @@ public class Wait extends AbstractProcessor {
if (signalIdPenalties.containsKey(fSignalId)) {
// This id is penalized.
+ if (logger.isDebugEnabled()) {
+ logger.debug("FlowFile {} with signalId='{}' is currently
penalized until {}", f, fSignalId, signalIdPenalties.get(fSignalId));
+ }
return REJECT_AND_CONTINUE;
}
@@ -331,6 +334,9 @@ public class Wait extends AbstractProcessor {
if (targetSignalIdStr == null) {
// This is the first one.
targetSignalId.set(fSignalId);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Selecting signalId='{}' for this trigger
batch", fSignalId);
+ }
return acceptResultSupplier.get();
}
@@ -338,6 +344,9 @@ public class Wait extends AbstractProcessor {
return acceptResultSupplier.get();
}
+ if (logger.isDebugEnabled()) {
+ logger.debug("FlowFile {} uses different signalId='{}' and is
skipped for this trigger batch targeting '{}'", f, fSignalId,
targetSignalIdStr);
+ }
return REJECT_AND_CONTINUE;
});
@@ -393,6 +402,10 @@ public class Wait extends AbstractProcessor {
return;
}
+ if (logger.isDebugEnabled()) {
+ logger.debug("Processing {} FlowFile(s) for signalId='{}'",
flowFiles.size(), targetSignalId.get());
+ }
+
// the cache client used to interact with the distributed cache
final AtomicDistributedMapCacheClient cache =
context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(AtomicDistributedMapCacheClient.class);
final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache);
@@ -403,6 +416,14 @@ public class Wait extends AbstractProcessor {
// get notifying signal
try {
signal = protocol.getSignal(signalId);
+ if (logger.isDebugEnabled()) {
+ if (signal == null) {
+ logger.debug("No signal found in cache for signalId='{}'",
signalId);
+ } else {
+ logger.debug("Fetched signal for signalId='{}': counts={}
attributesKeys={} releasableCount={}",
+ signalId, signal.getCounts(),
signal.getAttributes().keySet(), signal.getReleasableCount());
+ }
+ }
if (signal != null) {
originalSignalCounts.putAll(signal.getCounts());
}
@@ -470,6 +491,10 @@ public class Wait extends AbstractProcessor {
logger.error("Failed to parse releasableFlowFileCount when
processing {} due to {}", flowFile, e, e);
continue;
}
+ if (logger.isDebugEnabled()) {
+ logger.debug("Using targetCounterName='{}' targetCount={}
releasableFlowFileCount={} for signalId='{}'",
+ targetCounterName, targetCount,
releasableFlowFileCount, signalId);
+ }
}
// FlowFile is now validated and added to candidates.
@@ -481,11 +506,20 @@ public class Wait extends AbstractProcessor {
if (signal != null && !candidates.isEmpty()) {
if (releasableFlowFileCount > 0) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Evaluating releaseCandidates for
signalId='{}' counts={} targetCounterName='{}' targetCount={}
releasableFlowFileCount={} existingReleasableCount={} candidates={}",
+ signalId, signal.getCounts(), targetCounterName,
targetCount, releasableFlowFileCount, signal.getReleasableCount(),
candidates.size());
+ }
signal.releaseCandidates(targetCounterName, targetCount,
releasableFlowFileCount, candidates,
released ->
getFlowFilesFor.apply(REL_SUCCESS).addAll(released),
waiting ->
getFlowFilesFor.apply(REL_WAIT).addAll(waiting));
waitCompleted = signal.getTotalCount() == 0 &&
signal.getReleasableCount() == 0;
waitProgressed = !getFlowFilesFor.apply(REL_SUCCESS).isEmpty();
+ if (logger.isDebugEnabled()) {
+ logger.debug("Release evaluation result for signalId='{}':
released={} waiting={} remainingCounts={} remainingReleasable={}",
+ signalId,
getFlowFilesFor.apply(REL_SUCCESS).size(),
getFlowFilesFor.apply(REL_WAIT).size(),
+ signal.getCounts(), signal.getReleasableCount());
+ }
} else {
boolean reachedTargetCount =
StringUtils.isBlank(targetCounterName)
@@ -497,24 +531,43 @@ public class Wait extends AbstractProcessor {
} else {
getFlowFilesFor.apply(REL_WAIT).addAll(candidates);
}
+ if (logger.isDebugEnabled()) {
+ logger.debug("Zero-release-count mode for signalId='{}':
reachedTargetCount={} candidates={} routedToSuccess={} routedToWait={}",
+ signalId, reachedTargetCount, candidates.size(),
reachedTargetCount ? candidates.size() : 0, reachedTargetCount ? 0 :
candidates.size());
+ }
}
}
// Transfer FlowFiles.
processedFlowFiles.entrySet().forEach(transferFlowFiles);
+ if (logger.isDebugEnabled() && !processedFlowFiles.isEmpty()) {
+ processedFlowFiles.forEach((rel, list) ->
+ logger.debug("Routing {} FlowFile(s) to relationship '{}'
for signalId='{}'", list.size(), rel.getName(), signalId));
+ }
// Penalize signal id if no FlowFile transferred to success.
final PropertyValue waitPenaltyDuration =
context.getProperty(WAIT_PENALTY_DURATION);
if (waitPenaltyDuration.isSet() &&
getFlowFilesFor.apply(REL_SUCCESS).isEmpty()) {
- signalIdPenalties.put(signalId, System.currentTimeMillis() +
waitPenaltyDuration.asTimePeriod(TimeUnit.MILLISECONDS));
+ final long penaltyUntil = System.currentTimeMillis() +
waitPenaltyDuration.asTimePeriod(TimeUnit.MILLISECONDS);
+ signalIdPenalties.put(signalId, penaltyUntil);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Penalizing signalId='{}' until {} because no
FlowFile was released this trigger", signalId, penaltyUntil);
+ }
}
// Update signal if needed.
try {
if (waitCompleted) {
protocol.complete(signalId);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Completed wait for signalId='{}' and removed
signal from cache", signalId);
+ }
} else if (waitProgressed) {
protocol.replace(signal);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Updated signal in cache for signalId='{}'
with counts={} releasableCount={}",
+ signalId, signal.getCounts(),
signal.getReleasableCount());
+ }
}
} catch (final IOException e) {