I would like to like to submit a patch which speeds up forward rule
execution in the RETE engine.
It uses only slightly more memory, but reduces the time needed for my
dataset of about 100.000 triples from beyond measuring to about 5
minutes.
It passes all tests so far. Am I missing something?
Tilo Fischer
Index: jena-core/src/main/java/com/hp/hpl/jena/reasoner/rulesys/impl/BindingVectorMultiSet.java
===================================================================
--- jena-core/src/main/java/com/hp/hpl/jena/reasoner/rulesys/impl/BindingVectorMultiSet.java (revision 0)
+++ jena-core/src/main/java/com/hp/hpl/jena/reasoner/rulesys/impl/BindingVectorMultiSet.java (working copy)
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.hp.hpl.jena.reasoner.rulesys.impl;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import com.hp.hpl.jena.graph.Node;
+
+/**
+ * A multi set of BindingVector's divided in buckets matching an unique
+ * combination of values at given indices managed by RETEQueue
+ *
+ * @author Tilo Fischer
+ */
+public class BindingVectorMultiSet {
+
+ /**
+ * Inner class used to represent an updatable count.
+ * Formerly enclosed in RETEQueue
+ *
+ * @author <a href="mailto:[email protected]">Dave Reynolds</a>
+ */
+ protected static class Count {
+ /** the count */
+ int count;
+
+ /** Constructor */
+ public Count(int count) {
+ this.count = count;
+ }
+
+ /** Decrement the count value */
+ public void dec() {
+ count--;
+ }
+
+ /** Access count value */
+ public int getCount() {
+ return count;
+ }
+
+ /** Increment the count value */
+ public void inc() {
+ count++;
+ }
+
+ /** Set the count value */
+ public void setCount(int count) {
+ this.count = count;
+ }
+ }
+
+ /** Inner representation */
+ protected Map<BindingVector, Map<BindingVector, Count>> data = new HashMap<BindingVector, Map<BindingVector, Count>>();
+
+ /** An array of indices which mark the primary key */
+ protected byte[] matchIndices;
+
+ /**
+ * Constructor
+ *
+ * @param matchIndices
+ * a set of indices for matching
+ */
+ public BindingVectorMultiSet(byte[] matchIndices) {
+ this.matchIndices = matchIndices;
+ }
+
+ /**
+ * Increase the current quantity of env
+ *
+ * @param env
+ */
+ public void add(BindingVector env) {
+ Count c = get(env);
+ if (c == null) {
+ put(env, new Count(1));
+ } else {
+ c.inc();
+ }
+ }
+
+ /**
+ * Get current quantity of BindingVector env
+ *
+ * @param env
+ * @return
+ */
+ protected Count get(BindingVector env) {
+ Map<BindingVector, Count> set = getRawSubSet(env);
+ return (set == null ? null : set.get(env));
+ }
+
+ /**
+ * Create a BindingVector containing only values at matchIndices so it can
+ * be used as key
+ *
+ * @param env
+ * BindingVector to find the key for
+ * @return the key BindingVector
+ */
+ protected BindingVector getPartialEnv(BindingVector env) {
+ Node[] envNodes = env.getEnvironment();
+
+ Node[] partialEnv = new Node[envNodes.length];
+ for (byte i : matchIndices) {
+ partialEnv[i] = envNodes[i];
+ }
+ return new BindingVector(partialEnv);
+ }
+
+ /**
+ * Get the bucket into which env belongs if it exists
+ *
+ * @param env
+ * @return
+ */
+ protected Map<BindingVector, Count> getRawSubSet(BindingVector env) {
+ return data.get(getPartialEnv(env));
+ }
+
+ /**
+ * Get an iterator over all BindingVectors currently present which match
+ * with env
+ *
+ * @param env
+ * @return
+ */
+ public Iterator<BindingVector> getSubSet(BindingVector env) {
+ Map<BindingVector, Count> rawSubSet = getRawSubSet(env);
+ return (rawSubSet == null ? new HashMap<BindingVector, Count>(0)
+ : rawSubSet).keySet().iterator();
+
+ }
+
+ /**
+ * Set the quantity of env to a given Count value c
+ *
+ * @param env
+ * @param c
+ */
+ protected void put(BindingVector env, Count c) {
+ Map<BindingVector, Count> set = getRawSubSet(env);
+ if (set == null) {
+ set = new HashMap<BindingVector, Count>();
+ data.put(getPartialEnv(env), set);
+ }
+ set.put(env, c);
+
+ }
+
+ /**
+ * Copy all item from queue.data into data.
+ * Assumes this and queue share the same matchIndices.
+ *
+ * @param queue
+ */
+ public void putAll(BindingVectorMultiSet queue) {
+ for (Iterator<BindingVector> it = queue.data.keySet().iterator(); it
+ .hasNext();) {
+ BindingVector env = it.next();
+ Map<BindingVector, Count> set = getRawSubSet(env);
+ if (set == null) {
+ set = new HashMap<BindingVector, Count>();
+ data.put(env, set);
+ }
+ set.putAll(queue.data.get(env));
+ }
+ }
+
+ /**
+ * Decrease the quantity of env
+ *
+ * @param env
+ */
+ public void remove(BindingVector env) {
+ BindingVector key = getPartialEnv(env);
+ Map<BindingVector, Count> set = data.get(key);
+ if (set != null) {
+ Count c = set.get(env);
+ if (c != null) {
+ if (c.getCount() > 1) {
+ c.dec();
+ } else { // clean up
+ set.remove(env);
+ }
+ }
+ if (set.isEmpty()) {
+ data.remove(key);
+ }
+ }
+
+ }
+
+}
Index: jena-core/src/main/java/com/hp/hpl/jena/reasoner/rulesys/impl/RETEEngine.java
===================================================================
--- jena-core/src/main/java/com/hp/hpl/jena/reasoner/rulesys/impl/RETEEngine.java (revision 1457373)
+++ jena-core/src/main/java/com/hp/hpl/jena/reasoner/rulesys/impl/RETEEngine.java (working copy)
@@ -49,9 +49,14 @@
/** Queue of newly added triples waiting to be processed */
protected List<Triple> addsPending = new ArrayList<Triple>();
+ /** A HashSet of pending triples for faster lookup */
+ protected HashSet<Triple> addsHash = new HashSet<Triple>();
+
+
/** Queue of newly deleted triples waiting to be processed */
protected List<Triple> deletesPending = new ArrayList<Triple>();
+
/** The conflict set of rules waiting to fire */
protected RETEConflictSet conflictSet;
@@ -361,8 +366,9 @@
logger.debug("Add triple: " + PrintUtil.print(triple));
}
if (deletesPending.size() > 0) deletesPending.remove(triple);
- if (!addsPending.contains(triple)) // Experimental, not sure why it wasn't done before
+ if (!addsHash.contains(triple)) // Experimental, not sure why it wasn't done before
addsPending.add(triple);
+ addsHash.add(triple);
if (deduction) {
infGraph.addDeduction(triple);
}
@@ -375,6 +381,7 @@
*/
public synchronized void deleteTriple(Triple triple, boolean deduction) {
addsPending.remove(triple);
+ addsHash.remove(triple);
deletesPending.add(triple);
if (deduction) {
infGraph.getCurrentDeductionsGraph().delete(triple);
@@ -404,7 +411,9 @@
protected synchronized Triple nextAddTriple() {
int size = addsPending.size();
if (size > 0) {
- return addsPending.remove(size - 1);
+ Triple t=addsPending.remove(size - 1);
+ addsHash.remove(t);
+ return t;
}
return null;
}
Index: jena-core/src/main/java/com/hp/hpl/jena/reasoner/rulesys/impl/RETEQueue.java
===================================================================
--- jena-core/src/main/java/com/hp/hpl/jena/reasoner/rulesys/impl/RETEQueue.java (revision 1457373)
+++ jena-core/src/main/java/com/hp/hpl/jena/reasoner/rulesys/impl/RETEQueue.java (working copy)
@@ -23,159 +23,133 @@
import java.util.*;
/**
- * Represents one input left of a join node. The queue points to
- * a sibling queue representing the other leg which should be joined
- * against.
+ * Represents one input left of a join node. The queue points to a sibling queue
+ * representing the other leg which should be joined against.
+ *
+ * @author <a href="mailto:[email protected]">Dave Reynolds</a>
+ * @version $Revision: 1.1 $ on $Date: 2009-06-29 08:55:33 $
*/
public class RETEQueue implements RETESinkNode, RETESourceNode {
-
- /** A multi-set of partially bound envionments */
- protected HashMap<BindingVector, Count> queue = new HashMap<BindingVector, Count>();
-
- /** A set of variable indices which should match between the two inputs */
- protected byte[] matchIndices;
-
- /** The sibling queue which forms the other half of the join node */
- protected RETEQueue sibling;
-
- /** The node that results should be passed on to */
- protected RETESinkNode continuation;
-
- /**
- * Constructor. The queue is not usable until it has been bound
- * to a sibling and a continuation node.
- * @param A set of variable indices which should match between the two inputs
- */
- public RETEQueue(byte[] matchIndices) {
- this.matchIndices = matchIndices;
- }
-
- /**
- * Constructor. The queue is not usable until it has been bound
- * to a sibling and a continuation node.
- * @param A List of variable indices which should match between the two inputs
- */
- public RETEQueue(List<? extends Byte> matchIndexList) {
- int len = matchIndexList.size();
- matchIndices = new byte[len];
- for (int i = 0; i < len; i++) {
- matchIndices[i] = matchIndexList.get(i).byteValue();
- }
- }
-
- /**
- * Set the sibling for this node.
- */
- public void setSibling(RETEQueue sibling) {
- this.sibling = sibling;
- }
-
- /**
- * Set the continuation node for this node (and any sibling)
- */
- @Override
- public void setContinuation(RETESinkNode continuation) {
- this.continuation = continuation;
- if (sibling != null) sibling.continuation = continuation;
- }
- /**
- * Propagate a token to this node.
- * @param env a set of variable bindings for the rule being processed.
- * @param isAdd distinguishes between add and remove operations.
- */
- @Override
- public void fire(BindingVector env, boolean isAdd) {
- // Store the new token in this store
- Count count = queue.get(env);
- if (count == null) {
- // no entry yet
- if (!isAdd) return;
- queue.put(env, new Count(1));
- } else {
- if (isAdd) {
- count.inc();
- } else {
- count.dec();
- if (count.getCount() == 0) {
- queue.remove(env);
- }
- }
- }
-
- // Cross match new token against the entries in the sibling queue
- for (Iterator<BindingVector> i = sibling.queue.keySet().iterator(); i.hasNext(); ) {
- Node[] candidate = i.next().getEnvironment();
- Node[] envNodes = env.getEnvironment();
- boolean matchOK = true;
- for (int j = 0; j < matchIndices.length; j++) {
- int index = matchIndices[j];
- if ( ! candidate[index].sameValueAs(envNodes[index])) {
- matchOK = false;
- break;
- }
- }
- if (matchOK) {
- // Instantiate a new extended environment
- Node[] newNodes = new Node[candidate.length];
- for (int j = 0; j < candidate.length; j++) {
- Node n = candidate[j];
- newNodes[j] = (n == null) ? envNodes[j] : n;
- }
- BindingVector newEnv = new BindingVector(newNodes);
- // Fire the successor processing
- continuation.fire(newEnv, isAdd);
- }
- }
- }
+ /**
+ * A multi-set of partially bound envionments indices for matching are
+ * specified by matchIndices
+ */
+ protected BindingVectorMultiSet queue;
- /**
- * Inner class used to represent an updatable count.
- */
- protected static class Count {
- /** the count */
- int count;
-
- /** Constructor */
- public Count(int count) {
- this.count = count;
- }
-
- /** Access count value */
- public int getCount() {
- return count;
- }
-
- /** Increment the count value */
- public void inc() {
- count++;
- }
-
- /** Decrement the count value */
- public void dec() {
- count--;
- }
-
- /** Set the count value */
- public void setCount(int count) {
- this.count = count;
- }
- }
-
- /**
- * Clone this node in the network.
- * @param context the new context to which the network is being ported
- */
- @Override
- public RETENode clone(Map<RETENode, RETENode> netCopy, RETERuleContext context) {
- RETEQueue clone = (RETEQueue)netCopy.get(this);
- if (clone == null) {
- clone = new RETEQueue(matchIndices);
- netCopy.put(this, clone);
- clone.setSibling((RETEQueue)sibling.clone(netCopy, context));
- clone.setContinuation((RETESinkNode)continuation.clone(netCopy, context));
- clone.queue.putAll(queue);
- }
- return clone;
- }
+ /** A set of variable indices which should match between the two inputs */
+ protected byte[] matchIndices;
+
+ /** The sibling queue which forms the other half of the join node */
+ protected RETEQueue sibling;
+
+ /** The node that results should be passed on to */
+ protected RETESinkNode continuation;
+
+ /**
+ * Constructor. The queue is not usable until it has been bound to a sibling
+ * and a continuation node.
+ *
+ * @param A
+ * set of variable indices which should match between the two
+ * inputs
+ */
+ public RETEQueue(byte[] matchIndices) {
+ this.matchIndices = matchIndices;
+ this.queue = new BindingVectorMultiSet(matchIndices);
+ }
+
+ /**
+ * Constructor. The queue is not usable until it has been bound to a sibling
+ * and a continuation node.
+ *
+ * @param A
+ * List of variable indices which should match between the two
+ * inputs
+ */
+ public RETEQueue(List<? extends Byte> matchIndexList) {
+ int len = matchIndexList.size();
+ matchIndices = new byte[len];
+ for (int i = 0; i < len; i++) {
+ matchIndices[i] = matchIndexList.get(i).byteValue();
+ }
+ this.queue = new BindingVectorMultiSet(matchIndices);
+ }
+
+ /**
+ * Set the sibling for this node.
+ */
+ public void setSibling(RETEQueue sibling) {
+ this.sibling = sibling;
+ }
+
+ /**
+ * Set the continuation node for this node (and any sibling)
+ */
+ @Override
+ public void setContinuation(RETESinkNode continuation) {
+ this.continuation = continuation;
+ if (sibling != null)
+ sibling.continuation = continuation;
+ }
+
+ /**
+ * Propagate a token to this node.
+ *
+ * @param env
+ * a set of variable bindings for the rule being processed.
+ * @param isAdd
+ * distinguishes between add and remove operations.
+ */
+ @Override
+ public void fire(BindingVector env, boolean isAdd) {
+ // Store the new token in this store
+ if (isAdd) {
+ queue.add(env);
+ } else {
+ queue.remove(env);
+ }
+
+ // Cross match new token against the entries in the sibling queue
+
+ Node[] envNodes = env.getEnvironment();
+
+ for (Iterator<BindingVector> i = sibling.queue.getSubSet(env); i
+ .hasNext();) {
+ Node[] candidate = i.next().getEnvironment();
+ // matching is no longer required since queue.getSubSet(env) returns
+ // a HashMap with matching BindingVector's
+
+ // Instantiate a new extended environment
+ Node[] newNodes = new Node[candidate.length];
+ for (int j = 0; j < candidate.length; j++) {
+ Node n = candidate[j];
+ newNodes[j] = (n == null) ? envNodes[j] : n;
+ }
+ BindingVector newEnv = new BindingVector(newNodes);
+ // Fire the successor processing
+ continuation.fire(newEnv, isAdd);
+ }
+ }
+
+ /**
+ * Clone this node in the network.
+ *
+ * @param context
+ * the new context to which the network is being ported
+ */
+ @Override
+ public RETENode clone(Map<RETENode, RETENode> netCopy,
+ RETERuleContext context) {
+ RETEQueue clone = (RETEQueue) netCopy.get(this);
+ if (clone == null) {
+ clone = new RETEQueue(matchIndices);
+ netCopy.put(this, clone);
+ clone.setSibling((RETEQueue) sibling.clone(netCopy, context));
+ clone.setContinuation((RETESinkNode) continuation.clone(netCopy,
+ context));
+ clone.queue.putAll(queue);
+ }
+ return clone;
+ }
}