turcsanyip commented on a change in pull request #5634:
URL: https://github.com/apache/nifi/pull/5634#discussion_r780828831



##########
File path: 
nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/session/AsynchronousCommitTracker.java
##########
@@ -89,7 +117,7 @@ public boolean isReady(final Connectable connectable) {
             return false;
         }
 
-        if (isDataQueued(connectable)) {
+        if (isDataQueued(connectable) || (connectable.isTriggerWhenEmpty() && 
isDataHeld(connectable))) {
             logger.debug("{} {} is ready because it has data queued", this, 
connectable);
             return true;
         }

Review comment:
       The new condition could go to a separate `if` with its own log message: 
"ready because it has data held" (or "ready because it has unacknowledged 
data").
   The "not ready" log message could also be improved below:  "not ready 
because it has no data queued or held".

##########
File path: 
nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlowCurrent.java
##########
@@ -57,23 +58,22 @@ private StandardStatelessFlowCurrent(final Builder builder) 
{
         this.processContextFactory = builder.processContextFactory;
     }
 
-    public Connectable getCurrentComponent() {
-        return currentComponent;
-    }
-
     @Override
     public void triggerFlow() {
         try {
             boolean completionReached = false;
             while (!completionReached) {
+                tracker.shelveReadyComponents();

Review comment:
       Is shelving the previously ready components really needed?
   
   When I found the issue, I quickly fixed it with using the existing `ready` 
`LinkedHashSet` and implementing a simple `getNextReady()` method in 
`AsynchronousCommitTracker` which returns the top element of the ready "stack". 
I tested it again with several flows and could not find any issues with that. 
The advantage would be that the code is simpler and more straightforward (less 
datastructures, less loops).
   
   At this point in the execution, the ready stack contains the component that 
triggered the root components for more input (at the top of the stack), and may 
contain some other already ready components. During the re-execution of the 
flow from the root components, these components may become ready again and they 
will be moved to the top of the stack or may just be waiting in the stack in 
their original position (as they were shelved). In both cases they will be 
triggered when their turn comes and will process all the FlowFiles from their 
input queue. That's why I think that shelving is not needed.
   
   The code would look like this:
   ```
               while (!completionReached) {
                   triggerRootConnectables();
   
                   while (tracker.isAnyReady()) {
                       final Connectable connectable = tracker.getNextReady();
                       logger.debug("The next ready component to be triggered: 
{}", connectable);
   
                       // Continually trigger the given component as long as it 
is ready to be triggered
                       NextConnectable nextConnectable = 
triggerWhileReady(connectable);
   
                       // If there's nothing left to do, return
                       if (nextConnectable == NextConnectable.NONE) {
                           return;
                       }
   
                       // If next connectable is whatever is ready, just 
continue loop
                       if (nextConnectable == NextConnectable.NEXT_READY) {
                           continue;
                       }
   
                       // Otherwise, we need to break out of this loop so that 
we can trigger root connectables or complete dataflow
                       break;
                   }
   
                   // We have reached completion if the tracker does not know 
of any components ready to be triggered AND
                   // we have no data queued in the flow (with the exception of 
Output Ports).
                   completionReached = !tracker.isAnyReady() && 
isFlowQueueEmpty();
               }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to