This is an automated email from the ASF dual-hosted git repository.
andy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/jena.git
The following commit(s) were added to refs/heads/main by this push:
new 03ae644572 GH-3044: HashJoin iterator construction no longer eagerly
builds initial hash probe table.
03ae644572 is described below
commit 03ae6445720e4bb4d2af5f72f6ac8b160e72915b
Author: Claus Stadler <[email protected]>
AuthorDate: Wed Mar 5 23:09:15 2025 +0100
GH-3044: HashJoin iterator construction no longer eagerly builds initial
hash probe table.
---
.../engine/iterator/QueryIterFilterExpr.java | 4 +
.../sparql/engine/join/AbstractIterHashJoin.java | 58 +++++++------
.../engine/main/solver/StageMatchTriple.java | 14 ++++
.../org/apache/jena/sparql/exec/RowSetOps.java | 6 +-
.../org/apache/jena/sparql/exec/RowSetStream.java | 9 ++
.../sparql/api/TestQueryExecutionTimeout2.java | 97 +++++++++++++++++++++-
.../apache/jena/tdb1/solver/StageMatchTuple.java | 13 +++
.../apache/jena/tdb2/solver/StageMatchTuple.java | 13 +++
8 files changed, 185 insertions(+), 29 deletions(-)
diff --git
a/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIterFilterExpr.java
b/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIterFilterExpr.java
index 893aa0f367..1b2aa55f22 100644
---
a/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIterFilterExpr.java
+++
b/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIterFilterExpr.java
@@ -21,6 +21,7 @@ package org.apache.jena.sparql.engine.iterator;
import org.apache.jena.atlas.io.IndentedWriter;
import org.apache.jena.atlas.lib.Lib;
import org.apache.jena.atlas.logging.Log;
+import org.apache.jena.query.QueryCancelledException;
import org.apache.jena.sparql.engine.ExecutionContext;
import org.apache.jena.sparql.engine.QueryIterator;
import org.apache.jena.sparql.engine.binding.Binding;
@@ -48,6 +49,9 @@ public class QueryIterFilterExpr extends
QueryIterProcessBinding {
if ( expr.isSatisfied(binding, super.getExecContext()) )
return binding;
return null;
+ } catch (QueryCancelledException ex) {
+ ex.addSuppressed(new RuntimeException("Query cancelled
exception."));
+ throw ex;
} catch (ExprException ex) {
// Some evaluation exception: should not happen.
Log.warn(this, "Expression Exception in " + expr, ex);
diff --git
a/jena-arq/src/main/java/org/apache/jena/sparql/engine/join/AbstractIterHashJoin.java
b/jena-arq/src/main/java/org/apache/jena/sparql/engine/join/AbstractIterHashJoin.java
index 639b7bb015..126be65a85 100644
---
a/jena-arq/src/main/java/org/apache/jena/sparql/engine/join/AbstractIterHashJoin.java
+++
b/jena-arq/src/main/java/org/apache/jena/sparql/engine/join/AbstractIterHashJoin.java
@@ -43,8 +43,8 @@ public abstract class AbstractIterHashJoin extends QueryIter2
{
protected long s_trailerResults = 0 ; // Results from the
trailer iterator.
// See also stats in the probe table.
- protected final JoinKey joinKey ;
- protected final MultiHashProbeTable hashTable ;
+ protected JoinKey joinKey ;
+ protected MultiHashProbeTable hashTable ;
private QueryIterator iterStream ;
private Binding rowStream = null ;
@@ -61,40 +61,46 @@ public abstract class AbstractIterHashJoin extends
QueryIter2 {
protected AbstractIterHashJoin(JoinKey initialJoinKey, QueryIterator
probeIter, QueryIterator streamIter, ExecutionContext execCxt) {
super(probeIter, streamIter, execCxt) ;
- if ( initialJoinKey == null ) {
- // This block computes an initial join key from the common
variables of each iterator's first binding.
- QueryIterPeek pProbe = QueryIterPeek.create(probeIter, execCxt) ;
- QueryIterPeek pStream = QueryIterPeek.create(streamIter, execCxt) ;
+ this.joinKey = initialJoinKey ;
+ this.iterStream = streamIter ;
+ this.iterCurrent = null ;
+ }
- Binding bLeft = pProbe.peek() ;
- Binding bRight = pStream.peek() ;
+ private void doInit() {
+ QueryIterator probeIter = getLeft();
+ try {
+ if ( joinKey == null ) {
+ // This block computes an initial join key from the common
variables of each iterator's first binding.
+ ExecutionContext execCxt = getExecContext();
+ QueryIterPeek pProbe = QueryIterPeek.create(probeIter,
execCxt) ;
+ probeIter = pProbe ;
- List<Var> varsLeft = Iter.toList(bLeft.vars()) ;
- List<Var> varsRight = Iter.toList(bRight.vars()) ;
+ QueryIterPeek pStream = QueryIterPeek.create(iterStream,
execCxt) ;
+ this.iterStream = pStream ;
- initialJoinKey = JoinKey.create(varsLeft, varsRight) ;
+ Binding bLeft = pProbe.peek() ;
+ Binding bRight = pStream.peek() ;
- probeIter = pProbe ;
- streamIter = pStream ;
- }
+ List<Var> varsLeft = Iter.toList(bLeft.vars()) ;
+ List<Var> varsRight = Iter.toList(bRight.vars()) ;
- JoinKey maxJoinKey = null;
+ joinKey = JoinKey.create(varsLeft, varsRight) ;
+ }
- this.joinKey = initialJoinKey ;
- this.iterStream = streamIter ;
- this.hashTable = new MultiHashProbeTable(maxJoinKey, initialJoinKey) ;
- this.iterCurrent = null ;
- buildHashTable(probeIter) ;
+ JoinKey maxJoinKey = null;
+ this.hashTable = new MultiHashProbeTable(maxJoinKey, joinKey) ;
+ buildHashTable(probeIter) ;
+ } finally {
+ probeIter.close();
+ }
}
private void buildHashTable(QueryIterator iter1) {
state = Phase.HASH ;
- for (; iter1.hasNext();) {
- Binding row1 = iter1.next() ;
+ iter1.forEachRemaining(row1 -> {
s_countProbe ++ ;
hashTable.put(row1) ;
- }
- iter1.close() ;
+ });
state = Phase.STREAM ;
}
@@ -127,8 +133,10 @@ public abstract class AbstractIterHashJoin extends
QueryIter2 {
switch ( state ) {
case DONE : return null ;
case HASH :
- case INIT :
throw new IllegalStateException() ;
+ case INIT :
+ doInit();
+ break;
case TRAILER :
return doOneTail() ;
case STREAM :
diff --git
a/jena-arq/src/main/java/org/apache/jena/sparql/engine/main/solver/StageMatchTriple.java
b/jena-arq/src/main/java/org/apache/jena/sparql/engine/main/solver/StageMatchTriple.java
index 58e59fab66..f46ca62c0b 100644
---
a/jena-arq/src/main/java/org/apache/jena/sparql/engine/main/solver/StageMatchTriple.java
+++
b/jena-arq/src/main/java/org/apache/jena/sparql/engine/main/solver/StageMatchTriple.java
@@ -20,12 +20,14 @@ package org.apache.jena.sparql.engine.main.solver;
import java.util.Iterator;
import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import org.apache.jena.atlas.iterator.Iter;
import org.apache.jena.graph.Graph;
import org.apache.jena.graph.Node;
import org.apache.jena.graph.Triple;
+import org.apache.jena.query.QueryCancelledException;
import org.apache.jena.sparql.core.Var;
import org.apache.jena.sparql.engine.ExecutionContext;
import org.apache.jena.sparql.engine.binding.Binding;
@@ -61,6 +63,18 @@ public class StageMatchTriple {
// ExtendedIterator<Triple> graphIter = graph.find(s2, p2, o2) ;
// Language tags.
ExtendedIterator<Triple> graphIter = G.findByLang(graph, s2, p2, o2);
+
+ // Add cancel.
+ AtomicBoolean cancelSignal = execCxt.getCancelSignal();
+ if (cancelSignal != null) {
+ graphIter = graphIter.mapWith(x -> {
+ if (cancelSignal.get()) {
+ throw new QueryCancelledException();
+ }
+ return x;
+ });
+ }
+
ExtendedIterator<Binding> iter = graphIter.mapWith( r ->
mapper(resultsBuilder, s, p, o, r)).filterDrop(Objects::isNull);
return iter;
}
diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/RowSetOps.java
b/jena-arq/src/main/java/org/apache/jena/sparql/exec/RowSetOps.java
index 8a4dad0765..06eb4c11c9 100644
--- a/jena-arq/src/main/java/org/apache/jena/sparql/exec/RowSetOps.java
+++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/RowSetOps.java
@@ -30,9 +30,9 @@ import org.apache.jena.sparql.ARQConstants;
import org.apache.jena.sparql.core.Prologue;
import org.apache.jena.sparql.resultset.ResultsWriter;
-/**
+/**
* RowSetOps - Convenience ways to call the various output formatters.
- *
+ *
* @see ResultSetMgr
*/
@@ -51,7 +51,7 @@ public class RowSetOps {
* This operation consumes the RowSet.
*/
public static long count(RowSet rowSet)
- { return rowSet.rewindable().size(); }
+ { return rowSet.stream().count(); }
/**
* Output a result set in a text format. The result set is consumed.
diff --git
a/jena-arq/src/main/java/org/apache/jena/sparql/exec/RowSetStream.java
b/jena-arq/src/main/java/org/apache/jena/sparql/exec/RowSetStream.java
index f4d2d7d6e4..f816de00ca 100644
--- a/jena-arq/src/main/java/org/apache/jena/sparql/exec/RowSetStream.java
+++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/RowSetStream.java
@@ -21,6 +21,7 @@ package org.apache.jena.sparql.exec;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.function.Consumer;
import org.apache.jena.atlas.iterator.Iter;
import org.apache.jena.sparql.core.Var;
@@ -72,4 +73,12 @@ public class RowSetStream implements RowSet {
public void close() {
Iter.close(iterator);
}
+
+ @Override
+ public void forEachRemaining(Consumer<? super Binding> action) {
+ iterator.forEachRemaining(b -> {
+ ++rowNumber;
+ action.accept(b);
+ });
+ }
}
diff --git
a/jena-arq/src/test/java/org/apache/jena/sparql/api/TestQueryExecutionTimeout2.java
b/jena-arq/src/test/java/org/apache/jena/sparql/api/TestQueryExecutionTimeout2.java
index 7744ce2840..70b791d1f8 100644
---
a/jena-arq/src/test/java/org/apache/jena/sparql/api/TestQueryExecutionTimeout2.java
+++
b/jena-arq/src/test/java/org/apache/jena/sparql/api/TestQueryExecutionTimeout2.java
@@ -22,13 +22,30 @@ import static org.apache.jena.atlas.lib.Lib.sleep ;
import static org.junit.Assume.assumeFalse;
import java.util.concurrent.TimeUnit;
+import java.util.stream.LongStream;
import org.apache.jena.base.Sys ;
import org.apache.jena.graph.Graph ;
-import org.apache.jena.query.* ;
+import org.apache.jena.graph.NodeFactory;
+import org.apache.jena.graph.Triple;
+import org.apache.jena.graph.impl.GraphBase;
+import org.apache.jena.query.Dataset;
+import org.apache.jena.query.DatasetFactory;
+import org.apache.jena.query.QueryCancelledException;
+import org.apache.jena.query.QueryExecution;
+import org.apache.jena.query.ResultSet;
+import org.apache.jena.query.ResultSetFormatter;
import org.apache.jena.sparql.core.DatasetGraph ;
import org.apache.jena.sparql.core.DatasetGraphFactory ;
import org.apache.jena.sparql.engine.binding.Binding ;
+import org.apache.jena.sparql.engine.join.AbstractIterHashJoin;
+import org.apache.jena.sparql.engine.main.solver.StageMatchTriple;
+import org.apache.jena.sparql.exec.QueryExec;
+import org.apache.jena.sparql.exec.QueryExecDataset;
+import org.apache.jena.sparql.exec.RowSetOps;
+import org.apache.jena.util.iterator.ExtendedIterator;
+import org.apache.jena.util.iterator.NiceIterator;
+import org.apache.jena.util.iterator.WrappedIterator;
import org.junit.Assert ;
import org.junit.Before;
import org.junit.Test ;
@@ -110,5 +127,83 @@ public class TestQueryExecutionTimeout2
private int timeout(int time1, int time2) {
return mayBeErratic ? time2 : time1 ;
}
+
+ /**
+ * Test case for GH-3044.
+ * {@link AbstractIterHashJoin} used to eagerly populate a hash probe
table on iterator construction,
+ * while {@link QueryExecDataset} held a lock that prevented async abort
while the iterator was being constructed.
+ */
+ @Test(timeout = 5000, expected = QueryCancelledException.class)
+ public void test_timeout_hashJoin() {
+ // A very large virtual graph.
+ Graph graph = new GraphBase() {
+ @Override
+ protected ExtendedIterator<Triple> graphBaseFind(Triple
triplePattern) {
+ return WrappedIterator.createNoRemove(LongStream.range(0,
Long.MAX_VALUE)
+ .mapToObj(i ->
NodeFactory.createURI("http://www.example.org/r" + i))
+ .peek(x -> {
+ // Throttle binding generation to prevent going
out-ouf-memory.
+ // Bindings are likely to be stored in an in-memory
hash probe table.
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .map(i -> Triple.create(i, i, i))
+ .filter(triplePattern::matches)
+ .iterator());
+ }
+ };
+
+ try (QueryExec qe = QueryExec
+ .graph(graph)
+ .timeout(100, TimeUnit.MILLISECONDS)
+ .query("""
+ SELECT * {
+ ?a ?b ?c .
+ # When this test case was written, a hash probe table was
created for the rhs.
+ {
+ ?c ?d ?e .
+ }
+ UNION
+ { BIND('x' AS ?x) }
+ }
+ """).build()) {
+ RowSetOps.count(qe.select());
+ }
+ }
+
+ /** Test to ensure timeouts are considered in {@link StageMatchTriple}
when producing only empty joins from a large set of triples. */
+ @Test(expected = QueryCancelledException.class)
+ public void test_timeout_stageMatchTriple() {
+ // A very large virtual graph that never matches a concrete subject.
+ Graph graph = new GraphBase() {
+ @Override
+ protected ExtendedIterator<Triple> graphBaseFind(Triple t) {
+ if (t.getMatchSubject() != null &&
t.getMatchSubject().isConcrete()) {
+ // Don't match any concrete subject
+ return NiceIterator.emptyIterator();
+ } else {
+ // Generate :sI :p :oI triples
+ return WrappedIterator.createNoRemove(LongStream.range(0,
Long.MAX_VALUE)
+ .mapToObj(i ->
Triple.create(NodeFactory.createURI("http://www.example.org/s" + i),
t.getPredicate(), NodeFactory.createURI("http://www.example.org/o" + i)))
+ .iterator());
+ }
+ }
+ };
+
+ try (QueryExec qe = QueryExec
+ .graph(graph)
+ .timeout(100, TimeUnit.MILLISECONDS)
+ .query("""
+ SELECT * {
+ ?a <urn:p> ?c .
+ ?c <urn:p> ?e .
+ }
+ """).build()) {
+ RowSetOps.count(qe.select());
+ }
+ }
}
diff --git
a/jena-tdb1/src/main/java/org/apache/jena/tdb1/solver/StageMatchTuple.java
b/jena-tdb1/src/main/java/org/apache/jena/tdb1/solver/StageMatchTuple.java
index 299187610f..2ab35b13f5 100644
--- a/jena-tdb1/src/main/java/org/apache/jena/tdb1/solver/StageMatchTuple.java
+++ b/jena-tdb1/src/main/java/org/apache/jena/tdb1/solver/StageMatchTuple.java
@@ -20,6 +20,7 @@ package org.apache.jena.tdb1.solver;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Predicate;
@@ -28,6 +29,7 @@ import org.apache.jena.atlas.lib.StrUtils;
import org.apache.jena.atlas.lib.tuple.Tuple;
import org.apache.jena.atlas.lib.tuple.TupleFactory;
import org.apache.jena.graph.Node;
+import org.apache.jena.query.QueryCancelledException;
import org.apache.jena.sparql.core.Var;
import org.apache.jena.sparql.engine.ExecutionContext;
import org.apache.jena.tdb1.store.NodeId;
@@ -68,6 +70,17 @@ class StageMatchTuple {
iterMatches = x.iterator();
}
+ // Add cancel check.
+ AtomicBoolean cancelSignal = execCxt.getCancelSignal();
+ if (cancelSignal != null) {
+ iterMatches = Iter.map(iterMatches, x -> {
+ if (cancelSignal.get()) {
+ throw new QueryCancelledException();
+ }
+ return x;
+ });
+ }
+
// ** Allow a triple or quad filter here.
if ( filter != null )
iterMatches = Iter.filter(iterMatches, filter);
diff --git
a/jena-tdb2/src/main/java/org/apache/jena/tdb2/solver/StageMatchTuple.java
b/jena-tdb2/src/main/java/org/apache/jena/tdb2/solver/StageMatchTuple.java
index 0be831369c..3318b18390 100644
--- a/jena-tdb2/src/main/java/org/apache/jena/tdb2/solver/StageMatchTuple.java
+++ b/jena-tdb2/src/main/java/org/apache/jena/tdb2/solver/StageMatchTuple.java
@@ -20,6 +20,7 @@ package org.apache.jena.tdb2.solver;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Predicate;
@@ -28,6 +29,7 @@ import org.apache.jena.atlas.lib.StrUtils;
import org.apache.jena.atlas.lib.tuple.Tuple;
import org.apache.jena.atlas.lib.tuple.TupleFactory;
import org.apache.jena.graph.Node;
+import org.apache.jena.query.QueryCancelledException;
import org.apache.jena.sparql.core.Var;
import org.apache.jena.sparql.engine.ExecutionContext;
import org.apache.jena.tdb2.store.NodeId;
@@ -68,6 +70,17 @@ class StageMatchTuple {
iterMatches = x.iterator();
}
+ // Add cancel check.
+ AtomicBoolean cancelSignal = execCxt.getCancelSignal();
+ if (cancelSignal != null) {
+ iterMatches = Iter.map(iterMatches, x -> {
+ if (cancelSignal.get()) {
+ throw new QueryCancelledException();
+ }
+ return x;
+ });
+ }
+
// ** Allow a triple or quad filter here.
if ( filter != null )
iterMatches = Iter.filter(iterMatches, filter);