This is an automated email from the ASF dual-hosted git repository.
mariofusco pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-kie-drools.git
The following commit(s) were added to refs/heads/main by this push:
new 8a2b9dbaba [DROOLS-7584] process completely all facts inserted before
halt (#5576)
8a2b9dbaba is described below
commit 8a2b9dbaba8f0987c41c9ed2d1ee9dfdb9be0daf
Author: Mario Fusco <[email protected]>
AuthorDate: Tue Nov 7 15:17:02 2023 +0100
[DROOLS-7584] process completely all facts inserted before halt (#5576)
* [DROOLS-7584] process completely all facts inserted before halt
* fix imports order
* fix testModifyWithFromSudoku to meet halt() (#22)
---------
Co-authored-by: Toshiya Kobayashi <[email protected]>
---
.../drools/core/impl/ActivationsManagerImpl.java | 57 ++++--------------
.../java/org/drools/core/phreak/RuleExecutor.java | 30 +++++-----
.../drools/kiesession/agenda/DefaultAgenda.java | 70 +++++++++++++++-------
.../drools/model/codegen/execmodel/FromTest.java | 5 +-
.../mvel/integrationtests/FireUntilHaltTest.java | 37 ++++++++++++
5 files changed, 116 insertions(+), 83 deletions(-)
diff --git
a/drools-core/src/main/java/org/drools/core/impl/ActivationsManagerImpl.java
b/drools-core/src/main/java/org/drools/core/impl/ActivationsManagerImpl.java
index ab3d1a3b1b..7b4ea77967 100644
--- a/drools-core/src/main/java/org/drools/core/impl/ActivationsManagerImpl.java
+++ b/drools-core/src/main/java/org/drools/core/impl/ActivationsManagerImpl.java
@@ -184,8 +184,6 @@ public class ActivationsManagerImpl implements
ActivationsManager {
@Override
public void cancelActivation(InternalMatch internalMatch) {
- InternalMatch item = internalMatch;
-
if ( internalMatch.isQueued() ) {
if (internalMatch.getActivationGroupNode() != null ) {
internalMatch.getActivationGroupNode().getActivationGroup().removeActivation(internalMatch);
@@ -194,11 +192,11 @@ public class ActivationsManagerImpl implements
ActivationsManager {
getAgendaEventSupport().fireActivationCancelled(internalMatch,
reteEvaluator, MatchCancelledCause.WME_MODIFY);
}
- if (item.getRuleAgendaItem() != null) {
- item.getRuleAgendaItem().getRuleExecutor().fireConsequenceEvent(
this.reteEvaluator, this, item, ON_DELETE_MATCH_CONSEQUENCE_NAME );
+ if (internalMatch.getRuleAgendaItem() != null) {
+
internalMatch.getRuleAgendaItem().getRuleExecutor().fireConsequenceEvent(
this.reteEvaluator, this, internalMatch, ON_DELETE_MATCH_CONSEQUENCE_NAME );
}
- reteEvaluator.getRuleEventSupport().onDeleteMatch( item );
+ reteEvaluator.getRuleEventSupport().onDeleteMatch(internalMatch);
}
@Override
@@ -284,10 +282,10 @@ public class ActivationsManagerImpl implements
ActivationsManager {
@Override
public int fireAllRules(AgendaFilter agendaFilter, int fireLimit) {
- return fireLoop( agendaFilter, fireLimit, RestHandler.FIRE_ALL_RULES );
+ return fireLoop( agendaFilter, fireLimit );
}
- private int fireLoop(AgendaFilter agendaFilter, int fireLimit, RestHandler
restHandler) {
+ private int fireLoop(AgendaFilter agendaFilter, int fireLimit) {
firing = true;
int fireCount = 0;
PropagationEntry head = propagationList.takeAll();
@@ -320,7 +318,7 @@ public class ActivationsManagerImpl implements
ActivationsManager {
if ( returnedFireCount == 0 && head == null && ( group == null ||
( group.isEmpty() && !group.isAutoDeactivate() ) ) && !flushExpirations() ) {
// if true, the engine is now considered potentially at rest
- head = restHandler.handleRest( this );
+ head = handleRest();
}
}
@@ -349,44 +347,11 @@ public class ActivationsManagerImpl implements
ActivationsManager {
}
}
- interface RestHandler {
- RestHandler FIRE_ALL_RULES = new RestHandler.FireAllRulesRestHandler();
- RestHandler FIRE_UNTIL_HALT = new
RestHandler.FireUntilHaltRestHandler();
-
- PropagationEntry handleRest(ActivationsManagerImpl agenda);
-
- class FireAllRulesRestHandler implements RestHandler {
- @Override
- public PropagationEntry handleRest(ActivationsManagerImpl agenda) {
- PropagationEntry head = agenda.propagationList.takeAll();
- if (head == null) {
- agenda.firing = false;
- }
- return head;
- }
- }
-
- class FireUntilHaltRestHandler implements RestHandler {
- @Override
- public PropagationEntry handleRest(ActivationsManagerImpl agenda) {
- PropagationEntry head;
- // this must use the same sync target as takeAllPropagations,
to ensure this entire block is atomic, up to the point of wait
- synchronized (agenda.propagationList) {
- head = agenda.propagationList.takeAll();
-
- // if halt() has called, the thread should not be put into
a wait state
- // instead this is just a safe way to make sure the queue
is flushed before exiting the loop
- if (head == null) {
- agenda.propagationList.waitOnRest();
- head = agenda.propagationList.takeAll();
- if (head == null) {
- agenda.firing = false;
- }
- }
- }
-
- return head;
- }
+ private PropagationEntry handleRest() {
+ PropagationEntry head = propagationList.takeAll();
+ if (head == null) {
+ firing = false;
}
+ return head;
}
}
diff --git a/drools-core/src/main/java/org/drools/core/phreak/RuleExecutor.java
b/drools-core/src/main/java/org/drools/core/phreak/RuleExecutor.java
index 29a2121a66..b4f9da38bc 100644
--- a/drools-core/src/main/java/org/drools/core/phreak/RuleExecutor.java
+++ b/drools-core/src/main/java/org/drools/core/phreak/RuleExecutor.java
@@ -110,11 +110,7 @@ public class RuleExecutor {
return fire(activationsManager.getReteEvaluator(), activationsManager,
filter, fireCount, fireLimit);
}
- private int fire( ReteEvaluator reteEvaluator,
- ActivationsManager activationsManager,
- AgendaFilter filter,
- int fireCount,
- int fireLimit) {
+ private int fire( ReteEvaluator reteEvaluator, ActivationsManager
activationsManager, AgendaFilter filter, int fireCount, int fireLimit) {
int localFireCount = 0;
if (!tupleList.isEmpty()) {
@@ -167,8 +163,10 @@ public class RuleExecutor {
}
if (!ruleIsAllMatches) { // if firing rule is @All don't give
way to other rules
- if ( haltRuleFiring( fireCount, fireLimit, localFireCount,
activationsManager ) ) {
- break; // another rule has high priority and is on the
agenda, so evaluate it first
+ if ( firingHalted(activationsManager) ||
+ fireLimitReached(fireCount, fireLimit,
localFireCount) ||
+ ruleWithHigherSalienceActivated(activationsManager) )
{
+ break;
}
if (!reteEvaluator.isSequential()) {
evaluateNetworkIfDirty( activationsManager );
@@ -261,22 +259,22 @@ public class RuleExecutor {
return filter != null && !filter.accept((InternalMatch) leftTuple);
}
- private boolean haltRuleFiring(int fireCount,
- int fireLimit,
- int localFireCount,
- ActivationsManager activationsManager) {
- if (!activationsManager.isFiring() || (fireLimit >= 0 &&
(localFireCount + fireCount >= fireLimit))) {
- return true;
- }
+ private static boolean firingHalted(ActivationsManager activationsManager)
{
+ return !activationsManager.isFiring();
+ }
+ private boolean ruleWithHigherSalienceActivated(ActivationsManager
activationsManager) {
// The eager list must be evaluated first, as dynamic salience rules
will impact the results of peekNextRule
activationsManager.evaluateEagerList();
-
RuleAgendaItem nextRule = activationsManager.peekNextRule();
if (nextRule == ruleAgendaItem || nextRule == null) {
return false;
}
- return !ruleAgendaItem.getAgendaGroup().equals(
nextRule.getAgendaGroup() ) || !isHigherSalience(nextRule);
+ return
!ruleAgendaItem.getAgendaGroup().equals(nextRule.getAgendaGroup()) ||
!isHigherSalience(nextRule);
+ }
+
+ private static boolean fireLimitReached(int fireCount, int fireLimit, int
localFireCount) {
+ return fireLimit >= 0 && (localFireCount + fireCount >= fireLimit);
}
private boolean isHigherSalience(RuleAgendaItem nextRule) {
diff --git
a/drools-kiesession/src/main/java/org/drools/kiesession/agenda/DefaultAgenda.java
b/drools-kiesession/src/main/java/org/drools/kiesession/agenda/DefaultAgenda.java
index 66221696d2..e74c10e80a 100644
---
a/drools-kiesession/src/main/java/org/drools/kiesession/agenda/DefaultAgenda.java
+++
b/drools-kiesession/src/main/java/org/drools/kiesession/agenda/DefaultAgenda.java
@@ -18,6 +18,13 @@
*/
package org.drools.kiesession.agenda;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
import org.drools.base.definitions.rule.impl.QueryImpl;
import org.drools.base.definitions.rule.impl.RuleImpl;
import org.drools.core.RuleBaseConfiguration;
@@ -67,13 +74,6 @@ import org.kie.api.runtime.rule.AgendaGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
/**
* Rule-firing Agenda.
*
@@ -576,7 +576,7 @@ public class DefaultAgenda implements InternalAgenda {
private int fireLoop(AgendaFilter agendaFilter, int fireLimit, RestHandler
restHandler, boolean isInternalFire) {
int fireCount = 0;
try {
- PropagationEntry head = propagationList.takeAll();
+ PropagationEntry head = takePropagationHead();
int returnedFireCount;
boolean limitReached = fireLimit == 0; // -1 or > 0 will return
false. No reason for user to give 0, just handled for completeness.
@@ -604,19 +604,13 @@ public class DefaultAgenda implements InternalAgenda {
// Note that if a halt() command is given, the engine is changed
to INACTIVE,
// and isFiring returns false allowing it to exit before all rules
are fired.
//
- while ( isFiring() ) {
+ while ( isFiring() ||
executionStateMachine.getCurrentState().isHalting() ) {
if ( head != null ) {
// it is possible that there are no action propagations,
but there are rules to fire.
propagationList.flush(head);
head = null;
}
- // a halt may have occurred during the flushPropagations,
- // which changes the isFiring state. So a second isFiring
guard is needed
- if (!isFiring()) {
- break;
- }
-
evaluateEagerList();
InternalAgendaGroup group =
getAgendaGroupsManager().getNextFocus();
if ( group != null && !limitReached ) {
@@ -627,7 +621,7 @@ public class DefaultAgenda implements InternalAgenda {
fireCount += returnedFireCount;
limitReached = ( fireLimit > 0 && fireCount >= fireLimit );
- head = propagationList.takeAll();
+ head = takePropagationHead();
} else {
returnedFireCount = 0; // no rules fired this iteration,
so we know this is 0
group = null; // set the group to null in case the fire
limit has been reached
@@ -636,7 +630,7 @@ public class DefaultAgenda implements InternalAgenda {
if ( returnedFireCount == 0 && head == null && ( group == null
|| ( group.isEmpty() && !group.isAutoDeactivate() ) ) && !flushExpirations() ) {
// if true, the engine is now considered potentially at
rest
head = restHandler.handleRest( this, isInternalFire );
- if (!isInternalFire && head == null) {
+ if ( ( !isInternalFire ||
executionStateMachine.getCurrentState().isHalting() ) && head == null) {
break;
}
}
@@ -653,6 +647,13 @@ public class DefaultAgenda implements InternalAgenda {
return fireCount;
}
+ private PropagationEntry takePropagationHead() {
+ if (executionStateMachine.getCurrentState().isHalting()) {
+ return null;
+ }
+ return propagationList.takeAll();
+ }
+
interface RestHandler {
RestHandler FIRE_ALL_RULES = new FireAllRulesRestHandler();
RestHandler FIRE_UNTIL_HALT = new FireUntilHaltRestHandler();
@@ -684,7 +685,7 @@ public class DefaultAgenda implements InternalAgenda {
PropagationEntry head;
// this must use the same sync target as takeAllPropagations,
to ensure this entire block is atomic, up to the point of wait
synchronized (agenda.propagationList) {
- head = agenda.propagationList.takeAll();
+ head = agenda.takePropagationHead();
// if halt() has called, the thread should not be put into
a wait state
// instead this is just a safe way to make sure the queue
is flushed before exiting the loop
@@ -692,7 +693,7 @@ public class DefaultAgenda implements InternalAgenda {
agenda.executionStateMachine.getCurrentState() ==
ExecutionStateMachine.ExecutionState.FIRING_UNTIL_HALT ||
agenda.executionStateMachine.getCurrentState() ==
ExecutionStateMachine.ExecutionState.INACTIVE_ON_FIRING_UNTIL_HALT )) {
agenda.propagationList.waitOnRest();
- head = agenda.propagationList.takeAll();
+ head = agenda.takePropagationHead();
}
}
@@ -771,13 +772,38 @@ public class DefaultAgenda implements InternalAgenda {
}
}
+ static class ImmediateHalt extends
PropagationEntry.AbstractPropagationEntry {
+
+ private final ExecutionStateMachine executionStateMachine;
+ private final PropagationList propagationList;
+
+ protected ImmediateHalt( ExecutionStateMachine executionStateMachine,
PropagationList propagationList ) {
+ this.executionStateMachine = executionStateMachine;
+ this.propagationList = propagationList;
+ }
+
+ @Override
+ public void internalExecute(ReteEvaluator reteEvaluator ) {
+ executionStateMachine.immediateHalt(propagationList);
+ reteEvaluator.getActivationsManager().haltGroupEvaluation();
+ }
+
+ @Override
+ public String toString() {
+ return "ImmediateHalt";
+ }
+ }
+
@Override
public synchronized void halt() {
// only attempt halt an engine that is currently firing
// This will place a halt command on the propagation queue
// that will allow the engine to halt safely
if ( isFiring() ) {
- propagationList.addEntry(new Halt(executionStateMachine));
+ PropagationEntry halt = executionStateMachine.getCurrentState() ==
ExecutionStateMachine.ExecutionState.FIRING_ALL_RULES ?
+ new ImmediateHalt(executionStateMachine, propagationList) :
+ new Halt(executionStateMachine);
+ propagationList.addEntry(halt);
}
}
@@ -881,6 +907,10 @@ public class DefaultAgenda implements InternalAgenda {
return firing;
}
+ public boolean isHalting() {
+ return this == HALTING;
+ }
+
public boolean isAlive() {
return alive;
}
diff --git
a/drools-model/drools-model-codegen/src/test/java/org/drools/model/codegen/execmodel/FromTest.java
b/drools-model/drools-model-codegen/src/test/java/org/drools/model/codegen/execmodel/FromTest.java
index 23f4b42e58..afbcfe1840 100644
---
a/drools-model/drools-model-codegen/src/test/java/org/drools/model/codegen/execmodel/FromTest.java
+++
b/drools-model/drools-model-codegen/src/test/java/org/drools/model/codegen/execmodel/FromTest.java
@@ -259,7 +259,7 @@ public class FromTest extends BaseModelTest {
"rule \"set a value\" when\n" +
" $s : Setting( $rn: rowNo, $cn: colNo, $v: value
)\n" +
" $c : Cell( rowNo == $rn, colNo == $cn, value ==
null)\n" +
- " $ctr : Counter( $count: count )\n" +
+ " $ctr : Counter( $count: count != 0 )\n" +
"then\n" +
" System.out.println(\"set a value [\" + $v + \"]
to $c = \" + $c);\n" +
" modify( $c ){ setValue( $v ) }\n" +
@@ -269,6 +269,7 @@ public class FromTest extends BaseModelTest {
" $s: Setting( $rn: rowNo, $cn: colNo, $v: value
)\n" +
" Cell( rowNo == $rn, colNo == $cn, value == $v,
$exCells: exCells )\n" +
" $c: Cell( free contains $v ) from $exCells\n" +
+ " Counter( $count: count != 0 )\n" +
"then\n" +
" System.out.println(\"eliminate a value [\" + $v
+ \"] from Cell : $c = \" + $c);\n" +
" modify( $c ){ blockValue( $v ) }\n" +
@@ -280,6 +281,7 @@ public class FromTest extends BaseModelTest {
" not( $x: Cell( free contains $v )\n" +
" and\n" +
" Cell( this == $c, exCells contains $x )
)\n" +
+ " Counter( $count: count != 0 )\n" +
"then\n" +
" System.out.println( \"done setting cell \" +
$c.toString() ); \n" +
" delete( $s );\n" +
@@ -288,6 +290,7 @@ public class FromTest extends BaseModelTest {
"when\n" +
" not Setting()\n" +
" $c: Cell( $rn: rowNo, $cn: colNo, freeCount ==
1 )\n" +
+ " Counter( $count: count != 0 )\n" +
"then\n" +
" Integer i = $c.getFreeValue();\n" +
" System.out.println( \"single \" + i + \" at \"
+ $c.toString() );\n" +
diff --git
a/drools-test-coverage/test-compiler-integration/src/test/java/org/drools/mvel/integrationtests/FireUntilHaltTest.java
b/drools-test-coverage/test-compiler-integration/src/test/java/org/drools/mvel/integrationtests/FireUntilHaltTest.java
index d734e8586b..4426692133 100644
---
a/drools-test-coverage/test-compiler-integration/src/test/java/org/drools/mvel/integrationtests/FireUntilHaltTest.java
+++
b/drools-test-coverage/test-compiler-integration/src/test/java/org/drools/mvel/integrationtests/FireUntilHaltTest.java
@@ -21,6 +21,9 @@ package org.drools.mvel.integrationtests;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import org.drools.mvel.compiler.Cheese;
import org.drools.mvel.compiler.Person;
@@ -161,4 +164,38 @@ public class FireUntilHaltTest {
assertThat(alive).as("Thread should have died!").isFalse();
assertThat(list.size()).isEqualTo(1);
}
+
+ @Test
+ public void testAllFactsProcessedBeforeHalt() throws Exception {
+ String drl = "package org.example.drools;\n" +
+ "\n" +
+ "global java.util.concurrent.CountDownLatch latch;\n" +
+ "\n" +
+ "rule \"R1\" when\n" +
+ " $s : String()\n" +
+ "then\n" +
+ " latch.countDown();\n" +
+ "end\n" +
+ "rule \"R2\" when\n" +
+ " $s : String()\n" +
+ "then\n" +
+ " latch.countDown();\n" +
+ "end\n";
+
+ KieBase kbase = KieBaseUtil.getKieBaseFromKieModuleFromDrl("test",
kieBaseTestConfiguration, drl);
+ KieSession ksession = kbase.newKieSession();
+
+ CountDownLatch latch = new CountDownLatch(4);
+ ksession.setGlobal("latch", latch);
+
+ Executors.newSingleThreadExecutor().execute(ksession::fireUntilHalt);
+
+ ksession.insert("aaa");
+ ksession.insert("bbb");
+
+ ksession.halt();
+
+ // the 2 facts inserted should be processed before halt
+ assertThat(latch.await(100, TimeUnit.MILLISECONDS)).isTrue();
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]