turcsanyip commented on a change in pull request #5634:
URL: https://github.com/apache/nifi/pull/5634#discussion_r780828831
##########
File path:
nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/session/AsynchronousCommitTracker.java
##########
@@ -89,7 +117,7 @@ public boolean isReady(final Connectable connectable) {
return false;
}
- if (isDataQueued(connectable)) {
+ if (isDataQueued(connectable) || (connectable.isTriggerWhenEmpty() &&
isDataHeld(connectable))) {
logger.debug("{} {} is ready because it has data queued", this,
connectable);
return true;
}
Review comment:
The new condition could go to a separate `if` with its own log message:
"ready because it has data held" (or "ready because it has unacknowledged
data").
The "not ready" log message could also be improved below: "not ready
because it has no data queued or held".
##########
File path:
nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlowCurrent.java
##########
@@ -57,23 +58,22 @@ private StandardStatelessFlowCurrent(final Builder builder)
{
this.processContextFactory = builder.processContextFactory;
}
- public Connectable getCurrentComponent() {
- return currentComponent;
- }
-
@Override
public void triggerFlow() {
try {
boolean completionReached = false;
while (!completionReached) {
+ tracker.shelveReadyComponents();
Review comment:
Is shelving the previously ready components really needed?
When I found the issue, I quickly fixed it with using the existing `ready`
`LinkedHashSet` and implementing a simple `getNextReady()` method in
`AsynchronousCommitTracker` which returns the top element of the ready "stack".
I tested it again with several flows and could not find any issues with that.
The advantage would be that the code is simpler and more straightforward (less
datastructures, less loops).
At this point in the execution, the ready stack contains the component that
triggered the root components for more input (at the top of the stack), and may
contain some other already ready components. During the re-execution of the
flow from the root components, these components may become ready again and they
will be moved to the top of the stack or may just be waiting in the stack in
their original position (as they were shelved). In both cases they will be
triggered when their turn comes and will process all the FlowFiles from their
input queue. That's why I think that shelving is not needed.
The code would look like this:
```
while (!completionReached) {
triggerRootConnectables();
while (tracker.isAnyReady()) {
final Connectable connectable = tracker.getNextReady();
logger.debug("The next ready component to be triggered:
{}", connectable);
// Continually trigger the given component as long as it
is ready to be triggered
NextConnectable nextConnectable =
triggerWhileReady(connectable);
// If there's nothing left to do, return
if (nextConnectable == NextConnectable.NONE) {
return;
}
// If next connectable is whatever is ready, just
continue loop
if (nextConnectable == NextConnectable.NEXT_READY) {
continue;
}
// Otherwise, we need to break out of this loop so that
we can trigger root connectables or complete dataflow
break;
}
// We have reached completion if the tracker does not know
of any components ready to be triggered AND
// we have no data queued in the flow (with the exception of
Output Ports).
completionReached = !tracker.isAnyReady() &&
isFlowQueueEmpty();
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]