This is an automated email from the ASF dual-hosted git repository.

mariofusco pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-kie-drools.git


The following commit(s) were added to refs/heads/main by this push:
     new 8a2b9dbaba [DROOLS-7584] process completely all facts inserted before 
halt (#5576)
8a2b9dbaba is described below

commit 8a2b9dbaba8f0987c41c9ed2d1ee9dfdb9be0daf
Author: Mario Fusco <[email protected]>
AuthorDate: Tue Nov 7 15:17:02 2023 +0100

    [DROOLS-7584] process completely all facts inserted before halt (#5576)
    
    * [DROOLS-7584] process completely all facts inserted before halt
    
    * fix imports order
    
    * fix testModifyWithFromSudoku to meet halt() (#22)
    
    ---------
    
    Co-authored-by: Toshiya Kobayashi <[email protected]>
---
 .../drools/core/impl/ActivationsManagerImpl.java   | 57 ++++--------------
 .../java/org/drools/core/phreak/RuleExecutor.java  | 30 +++++-----
 .../drools/kiesession/agenda/DefaultAgenda.java    | 70 +++++++++++++++-------
 .../drools/model/codegen/execmodel/FromTest.java   |  5 +-
 .../mvel/integrationtests/FireUntilHaltTest.java   | 37 ++++++++++++
 5 files changed, 116 insertions(+), 83 deletions(-)

diff --git 
a/drools-core/src/main/java/org/drools/core/impl/ActivationsManagerImpl.java 
b/drools-core/src/main/java/org/drools/core/impl/ActivationsManagerImpl.java
index ab3d1a3b1b..7b4ea77967 100644
--- a/drools-core/src/main/java/org/drools/core/impl/ActivationsManagerImpl.java
+++ b/drools-core/src/main/java/org/drools/core/impl/ActivationsManagerImpl.java
@@ -184,8 +184,6 @@ public class ActivationsManagerImpl implements 
ActivationsManager {
 
     @Override
     public void cancelActivation(InternalMatch internalMatch) {
-        InternalMatch item = internalMatch;
-
         if ( internalMatch.isQueued() ) {
             if (internalMatch.getActivationGroupNode() != null ) {
                 
internalMatch.getActivationGroupNode().getActivationGroup().removeActivation(internalMatch);
@@ -194,11 +192,11 @@ public class ActivationsManagerImpl implements 
ActivationsManager {
             getAgendaEventSupport().fireActivationCancelled(internalMatch, 
reteEvaluator, MatchCancelledCause.WME_MODIFY);
         }
 
-        if (item.getRuleAgendaItem() != null) {
-            item.getRuleAgendaItem().getRuleExecutor().fireConsequenceEvent( 
this.reteEvaluator, this, item, ON_DELETE_MATCH_CONSEQUENCE_NAME );
+        if (internalMatch.getRuleAgendaItem() != null) {
+            
internalMatch.getRuleAgendaItem().getRuleExecutor().fireConsequenceEvent( 
this.reteEvaluator, this, internalMatch, ON_DELETE_MATCH_CONSEQUENCE_NAME );
         }
 
-        reteEvaluator.getRuleEventSupport().onDeleteMatch( item );
+        reteEvaluator.getRuleEventSupport().onDeleteMatch(internalMatch);
     }
 
     @Override
@@ -284,10 +282,10 @@ public class ActivationsManagerImpl implements 
ActivationsManager {
 
     @Override
     public int fireAllRules(AgendaFilter agendaFilter, int fireLimit) {
-        return fireLoop( agendaFilter, fireLimit, RestHandler.FIRE_ALL_RULES );
+        return fireLoop( agendaFilter, fireLimit );
     }
 
-    private int fireLoop(AgendaFilter agendaFilter, int fireLimit, RestHandler 
restHandler) {
+    private int fireLoop(AgendaFilter agendaFilter, int fireLimit) {
         firing = true;
         int fireCount = 0;
         PropagationEntry head = propagationList.takeAll();
@@ -320,7 +318,7 @@ public class ActivationsManagerImpl implements 
ActivationsManager {
 
             if ( returnedFireCount == 0 && head == null && ( group == null || 
( group.isEmpty() && !group.isAutoDeactivate() ) ) && !flushExpirations() ) {
                 // if true, the engine is now considered potentially at rest
-                head = restHandler.handleRest( this );
+                head = handleRest();
             }
         }
 
@@ -349,44 +347,11 @@ public class ActivationsManagerImpl implements 
ActivationsManager {
         }
     }
 
-    interface RestHandler {
-        RestHandler FIRE_ALL_RULES = new RestHandler.FireAllRulesRestHandler();
-        RestHandler FIRE_UNTIL_HALT = new 
RestHandler.FireUntilHaltRestHandler();
-
-        PropagationEntry handleRest(ActivationsManagerImpl agenda);
-
-        class FireAllRulesRestHandler implements RestHandler {
-            @Override
-            public PropagationEntry handleRest(ActivationsManagerImpl agenda) {
-                PropagationEntry head = agenda.propagationList.takeAll();
-                if (head == null) {
-                    agenda.firing = false;
-                }
-                return head;
-            }
-        }
-
-        class FireUntilHaltRestHandler implements RestHandler {
-            @Override
-            public PropagationEntry handleRest(ActivationsManagerImpl agenda) {
-                PropagationEntry head;
-                // this must use the same sync target as takeAllPropagations, 
to ensure this entire block is atomic, up to the point of wait
-                synchronized (agenda.propagationList) {
-                    head = agenda.propagationList.takeAll();
-
-                    // if halt() has called, the thread should not be put into 
a wait state
-                    // instead this is just a safe way to make sure the queue 
is flushed before exiting the loop
-                    if (head == null) {
-                        agenda.propagationList.waitOnRest();
-                        head = agenda.propagationList.takeAll();
-                        if (head == null) {
-                            agenda.firing = false;
-                        }
-                    }
-                }
-
-                return head;
-            }
+    private PropagationEntry handleRest() {
+        PropagationEntry head = propagationList.takeAll();
+        if (head == null) {
+            firing = false;
         }
+        return head;
     }
 }
diff --git a/drools-core/src/main/java/org/drools/core/phreak/RuleExecutor.java 
b/drools-core/src/main/java/org/drools/core/phreak/RuleExecutor.java
index 29a2121a66..b4f9da38bc 100644
--- a/drools-core/src/main/java/org/drools/core/phreak/RuleExecutor.java
+++ b/drools-core/src/main/java/org/drools/core/phreak/RuleExecutor.java
@@ -110,11 +110,7 @@ public class RuleExecutor {
         return fire(activationsManager.getReteEvaluator(), activationsManager, 
filter, fireCount, fireLimit);
     }
 
-    private int fire( ReteEvaluator reteEvaluator,
-                      ActivationsManager activationsManager,
-                      AgendaFilter filter,
-                      int fireCount,
-                      int fireLimit) {
+    private int fire( ReteEvaluator reteEvaluator, ActivationsManager 
activationsManager, AgendaFilter filter, int fireCount, int fireLimit) {
         int localFireCount = 0;
 
         if (!tupleList.isEmpty()) {
@@ -167,8 +163,10 @@ public class RuleExecutor {
                 }
 
                 if (!ruleIsAllMatches) { // if firing rule is @All don't give 
way to other rules
-                    if ( haltRuleFiring( fireCount, fireLimit, localFireCount, 
activationsManager ) ) {
-                        break; // another rule has high priority and is on the 
agenda, so evaluate it first
+                    if ( firingHalted(activationsManager) ||
+                         fireLimitReached(fireCount, fireLimit, 
localFireCount) ||
+                         ruleWithHigherSalienceActivated(activationsManager) ) 
{
+                        break;
                     }
                     if (!reteEvaluator.isSequential()) {
                         evaluateNetworkIfDirty( activationsManager );
@@ -261,22 +259,22 @@ public class RuleExecutor {
         return filter != null && !filter.accept((InternalMatch) leftTuple);
     }
 
-    private boolean haltRuleFiring(int fireCount,
-                                   int fireLimit,
-                                   int localFireCount,
-                                   ActivationsManager activationsManager) {
-        if (!activationsManager.isFiring() || (fireLimit >= 0 && 
(localFireCount + fireCount >= fireLimit))) {
-            return true;
-        }
+    private static boolean firingHalted(ActivationsManager activationsManager) 
{
+        return !activationsManager.isFiring();
+    }
 
+    private boolean ruleWithHigherSalienceActivated(ActivationsManager 
activationsManager) {
         // The eager list must be evaluated first, as dynamic salience rules 
will impact the results of peekNextRule
         activationsManager.evaluateEagerList();
-
         RuleAgendaItem nextRule = activationsManager.peekNextRule();
         if (nextRule == ruleAgendaItem || nextRule == null) {
             return false;
         }
-        return !ruleAgendaItem.getAgendaGroup().equals( 
nextRule.getAgendaGroup() ) || !isHigherSalience(nextRule);
+        return 
!ruleAgendaItem.getAgendaGroup().equals(nextRule.getAgendaGroup()) || 
!isHigherSalience(nextRule);
+    }
+
+    private static boolean fireLimitReached(int fireCount, int fireLimit, int 
localFireCount) {
+        return fireLimit >= 0 && (localFireCount + fireCount >= fireLimit);
     }
 
     private boolean isHigherSalience(RuleAgendaItem nextRule) {
diff --git 
a/drools-kiesession/src/main/java/org/drools/kiesession/agenda/DefaultAgenda.java
 
b/drools-kiesession/src/main/java/org/drools/kiesession/agenda/DefaultAgenda.java
index 66221696d2..e74c10e80a 100644
--- 
a/drools-kiesession/src/main/java/org/drools/kiesession/agenda/DefaultAgenda.java
+++ 
b/drools-kiesession/src/main/java/org/drools/kiesession/agenda/DefaultAgenda.java
@@ -18,6 +18,13 @@
  */
 package org.drools.kiesession.agenda;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
 import org.drools.base.definitions.rule.impl.QueryImpl;
 import org.drools.base.definitions.rule.impl.RuleImpl;
 import org.drools.core.RuleBaseConfiguration;
@@ -67,13 +74,6 @@ import org.kie.api.runtime.rule.AgendaGroup;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
 /**
  * Rule-firing Agenda.
  * 
@@ -576,7 +576,7 @@ public class DefaultAgenda implements InternalAgenda {
     private int fireLoop(AgendaFilter agendaFilter, int fireLimit, RestHandler 
restHandler, boolean isInternalFire) {
         int fireCount = 0;
         try {
-            PropagationEntry head = propagationList.takeAll();
+            PropagationEntry head = takePropagationHead();
             int returnedFireCount;
 
             boolean limitReached = fireLimit == 0; // -1 or > 0 will return 
false. No reason for user to give 0, just handled for completeness.
@@ -604,19 +604,13 @@ public class DefaultAgenda implements InternalAgenda {
             // Note that if a halt() command is given, the engine is changed 
to INACTIVE,
             // and isFiring returns false allowing it to exit before all rules 
are fired.
             //
-            while ( isFiring()  )  {
+            while ( isFiring() || 
executionStateMachine.getCurrentState().isHalting() )  {
                 if ( head != null ) {
                     // it is possible that there are no action propagations, 
but there are rules to fire.
                     propagationList.flush(head);
                     head = null;
                 }
 
-                // a halt may have occurred during the flushPropagations,
-                // which changes the isFiring state. So a second isFiring 
guard is needed
-                if (!isFiring()) {
-                    break;
-                }
-
                 evaluateEagerList();
                 InternalAgendaGroup group = 
getAgendaGroupsManager().getNextFocus();
                 if ( group != null && !limitReached ) {
@@ -627,7 +621,7 @@ public class DefaultAgenda implements InternalAgenda {
                     fireCount += returnedFireCount;
 
                     limitReached = ( fireLimit > 0 && fireCount >= fireLimit );
-                    head = propagationList.takeAll();
+                    head = takePropagationHead();
                 } else {
                     returnedFireCount = 0; // no rules fired this iteration, 
so we know this is 0
                     group = null; // set the group to null in case the fire 
limit has been reached
@@ -636,7 +630,7 @@ public class DefaultAgenda implements InternalAgenda {
                 if ( returnedFireCount == 0 && head == null && ( group == null 
|| ( group.isEmpty() && !group.isAutoDeactivate() ) ) && !flushExpirations() ) {
                     // if true, the engine is now considered potentially at 
rest
                     head = restHandler.handleRest( this, isInternalFire );
-                    if (!isInternalFire && head == null) {
+                    if ( ( !isInternalFire || 
executionStateMachine.getCurrentState().isHalting() ) && head == null) {
                         break;
                     }
                 }
@@ -653,6 +647,13 @@ public class DefaultAgenda implements InternalAgenda {
         return fireCount;
     }
 
+    private PropagationEntry takePropagationHead() {
+        if (executionStateMachine.getCurrentState().isHalting()) {
+            return null;
+        }
+        return propagationList.takeAll();
+    }
+
     interface RestHandler {
         RestHandler FIRE_ALL_RULES = new FireAllRulesRestHandler();
         RestHandler FIRE_UNTIL_HALT = new FireUntilHaltRestHandler();
@@ -684,7 +685,7 @@ public class DefaultAgenda implements InternalAgenda {
                 PropagationEntry head;
                 // this must use the same sync target as takeAllPropagations, 
to ensure this entire block is atomic, up to the point of wait
                 synchronized (agenda.propagationList) {
-                    head = agenda.propagationList.takeAll();
+                    head = agenda.takePropagationHead();
 
                     // if halt() has called, the thread should not be put into 
a wait state
                     // instead this is just a safe way to make sure the queue 
is flushed before exiting the loop
@@ -692,7 +693,7 @@ public class DefaultAgenda implements InternalAgenda {
                             agenda.executionStateMachine.getCurrentState() == 
ExecutionStateMachine.ExecutionState.FIRING_UNTIL_HALT ||
                             agenda.executionStateMachine.getCurrentState() == 
ExecutionStateMachine.ExecutionState.INACTIVE_ON_FIRING_UNTIL_HALT )) {
                         agenda.propagationList.waitOnRest();
-                        head = agenda.propagationList.takeAll();
+                        head = agenda.takePropagationHead();
                     }
                 }
 
@@ -771,13 +772,38 @@ public class DefaultAgenda implements InternalAgenda {
         }
     }
 
+    static class ImmediateHalt extends 
PropagationEntry.AbstractPropagationEntry {
+
+        private final ExecutionStateMachine executionStateMachine;
+        private final PropagationList propagationList;
+
+        protected ImmediateHalt( ExecutionStateMachine executionStateMachine, 
PropagationList propagationList ) {
+            this.executionStateMachine = executionStateMachine;
+            this.propagationList = propagationList;
+        }
+
+        @Override
+        public void internalExecute(ReteEvaluator reteEvaluator ) {
+            executionStateMachine.immediateHalt(propagationList);
+            reteEvaluator.getActivationsManager().haltGroupEvaluation();
+        }
+
+        @Override
+        public String toString() {
+            return "ImmediateHalt";
+        }
+    }
+
     @Override
     public synchronized void halt() {
         // only attempt halt an engine that is currently firing
         // This will place a halt command on the propagation queue
         // that will allow the engine to halt safely
         if ( isFiring() ) {
-            propagationList.addEntry(new Halt(executionStateMachine));
+            PropagationEntry halt = executionStateMachine.getCurrentState() == 
ExecutionStateMachine.ExecutionState.FIRING_ALL_RULES ?
+                    new ImmediateHalt(executionStateMachine, propagationList) :
+                    new Halt(executionStateMachine);
+            propagationList.addEntry(halt);
         }
     }
 
@@ -881,6 +907,10 @@ public class DefaultAgenda implements InternalAgenda {
                 return firing;
             }
 
+            public boolean isHalting() {
+                return this == HALTING;
+            }
+
             public boolean isAlive() {
                 return alive;
             }
diff --git 
a/drools-model/drools-model-codegen/src/test/java/org/drools/model/codegen/execmodel/FromTest.java
 
b/drools-model/drools-model-codegen/src/test/java/org/drools/model/codegen/execmodel/FromTest.java
index 23f4b42e58..afbcfe1840 100644
--- 
a/drools-model/drools-model-codegen/src/test/java/org/drools/model/codegen/execmodel/FromTest.java
+++ 
b/drools-model/drools-model-codegen/src/test/java/org/drools/model/codegen/execmodel/FromTest.java
@@ -259,7 +259,7 @@ public class FromTest extends BaseModelTest {
                            "rule \"set a value\" when\n" +
                            "  $s : Setting( $rn: rowNo, $cn: colNo, $v: value 
)\n" +
                            "  $c : Cell( rowNo == $rn, colNo == $cn, value == 
null)\n" +
-                           "  $ctr : Counter( $count: count )\n" +
+                           "  $ctr : Counter( $count: count != 0 )\n" +
                            "then\n" +
                            "  System.out.println(\"set a value [\" + $v + \"] 
to $c = \" + $c);\n" +
                            "  modify( $c ){ setValue( $v ) }\n" +
@@ -269,6 +269,7 @@ public class FromTest extends BaseModelTest {
                            "  $s: Setting( $rn: rowNo, $cn: colNo, $v: value 
)\n" +
                            "  Cell( rowNo == $rn, colNo == $cn, value == $v, 
$exCells: exCells )\n" +
                            "  $c: Cell( free contains $v ) from $exCells\n" +
+                           "  Counter( $count: count != 0 )\n" +
                            "then\n" +
                            "  System.out.println(\"eliminate a value [\" + $v 
+ \"] from Cell : $c = \" + $c);\n" +
                            "  modify( $c ){ blockValue( $v ) }\n" +
@@ -280,6 +281,7 @@ public class FromTest extends BaseModelTest {
                            "    not( $x: Cell( free contains $v )\n" +
                            "         and\n" +
                            "         Cell( this == $c, exCells contains $x ) 
)\n" +
+                           "    Counter( $count: count != 0 )\n" +
                            "then\n" +
                            "    System.out.println( \"done setting cell \" + 
$c.toString() ); \n" +
                            "    delete( $s );\n" +
@@ -288,6 +290,7 @@ public class FromTest extends BaseModelTest {
                            "when\n" +
                            "    not Setting()\n" +
                            "    $c: Cell( $rn: rowNo, $cn: colNo, freeCount == 
1 )\n" +
+                           "    Counter( $count: count != 0 )\n" +
                            "then\n" +
                            "    Integer i = $c.getFreeValue();\n" +
                            "    System.out.println( \"single \" + i + \" at \" 
+ $c.toString() );\n" +
diff --git 
a/drools-test-coverage/test-compiler-integration/src/test/java/org/drools/mvel/integrationtests/FireUntilHaltTest.java
 
b/drools-test-coverage/test-compiler-integration/src/test/java/org/drools/mvel/integrationtests/FireUntilHaltTest.java
index d734e8586b..4426692133 100644
--- 
a/drools-test-coverage/test-compiler-integration/src/test/java/org/drools/mvel/integrationtests/FireUntilHaltTest.java
+++ 
b/drools-test-coverage/test-compiler-integration/src/test/java/org/drools/mvel/integrationtests/FireUntilHaltTest.java
@@ -21,6 +21,9 @@ package org.drools.mvel.integrationtests;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import org.drools.mvel.compiler.Cheese;
 import org.drools.mvel.compiler.Person;
@@ -161,4 +164,38 @@ public class FireUntilHaltTest {
         assertThat(alive).as("Thread should have died!").isFalse();
         assertThat(list.size()).isEqualTo(1);
     }
+
+    @Test
+    public void testAllFactsProcessedBeforeHalt() throws Exception {
+        String drl = "package org.example.drools;\n" +
+                "\n" +
+                "global java.util.concurrent.CountDownLatch latch;\n" +
+                "\n" +
+                "rule \"R1\" when\n" +
+                "    $s : String()\n" +
+                "then\n" +
+                "    latch.countDown();\n" +
+                "end\n" +
+                "rule \"R2\" when\n" +
+                "    $s : String()\n" +
+                "then\n" +
+                "    latch.countDown();\n" +
+                "end\n";
+
+        KieBase kbase = KieBaseUtil.getKieBaseFromKieModuleFromDrl("test", 
kieBaseTestConfiguration, drl);
+        KieSession ksession = kbase.newKieSession();
+
+        CountDownLatch latch = new CountDownLatch(4);
+        ksession.setGlobal("latch", latch);
+
+        Executors.newSingleThreadExecutor().execute(ksession::fireUntilHalt);
+
+        ksession.insert("aaa");
+        ksession.insert("bbb");
+
+        ksession.halt();
+
+        // the 2 facts inserted should be processed before halt
+        assertThat(latch.await(100, TimeUnit.MILLISECONDS)).isTrue();
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to