This is an automated email from the ASF dual-hosted git repository.

andy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/jena.git


The following commit(s) were added to refs/heads/main by this push:
     new 03ae644572 GH-3044: HashJoin iterator construction no longer eagerly 
builds initial hash probe table.
03ae644572 is described below

commit 03ae6445720e4bb4d2af5f72f6ac8b160e72915b
Author: Claus Stadler <[email protected]>
AuthorDate: Wed Mar 5 23:09:15 2025 +0100

    GH-3044: HashJoin iterator construction no longer eagerly builds initial 
hash probe table.
---
 .../engine/iterator/QueryIterFilterExpr.java       |  4 +
 .../sparql/engine/join/AbstractIterHashJoin.java   | 58 +++++++------
 .../engine/main/solver/StageMatchTriple.java       | 14 ++++
 .../org/apache/jena/sparql/exec/RowSetOps.java     |  6 +-
 .../org/apache/jena/sparql/exec/RowSetStream.java  |  9 ++
 .../sparql/api/TestQueryExecutionTimeout2.java     | 97 +++++++++++++++++++++-
 .../apache/jena/tdb1/solver/StageMatchTuple.java   | 13 +++
 .../apache/jena/tdb2/solver/StageMatchTuple.java   | 13 +++
 8 files changed, 185 insertions(+), 29 deletions(-)

diff --git 
a/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIterFilterExpr.java
 
b/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIterFilterExpr.java
index 893aa0f367..1b2aa55f22 100644
--- 
a/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIterFilterExpr.java
+++ 
b/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIterFilterExpr.java
@@ -21,6 +21,7 @@ package org.apache.jena.sparql.engine.iterator;
 import org.apache.jena.atlas.io.IndentedWriter;
 import org.apache.jena.atlas.lib.Lib;
 import org.apache.jena.atlas.logging.Log;
+import org.apache.jena.query.QueryCancelledException;
 import org.apache.jena.sparql.engine.ExecutionContext;
 import org.apache.jena.sparql.engine.QueryIterator;
 import org.apache.jena.sparql.engine.binding.Binding;
@@ -48,6 +49,9 @@ public class QueryIterFilterExpr extends 
QueryIterProcessBinding {
             if ( expr.isSatisfied(binding, super.getExecContext()) )
                 return binding;
             return null;
+        } catch (QueryCancelledException ex) {
+            ex.addSuppressed(new RuntimeException("Query cancelled 
exception."));
+            throw ex;
         } catch (ExprException ex) {
             // Some evaluation exception: should not happen.
             Log.warn(this, "Expression Exception in " + expr, ex);
diff --git 
a/jena-arq/src/main/java/org/apache/jena/sparql/engine/join/AbstractIterHashJoin.java
 
b/jena-arq/src/main/java/org/apache/jena/sparql/engine/join/AbstractIterHashJoin.java
index 639b7bb015..126be65a85 100644
--- 
a/jena-arq/src/main/java/org/apache/jena/sparql/engine/join/AbstractIterHashJoin.java
+++ 
b/jena-arq/src/main/java/org/apache/jena/sparql/engine/join/AbstractIterHashJoin.java
@@ -43,8 +43,8 @@ public abstract class AbstractIterHashJoin extends QueryIter2 
{
     protected long s_trailerResults       = 0 ;       // Results from the 
trailer iterator.
     // See also stats in the probe table.
 
-    protected final JoinKey               joinKey ;
-    protected final MultiHashProbeTable   hashTable ;
+    protected JoinKey                   joinKey ;
+    protected MultiHashProbeTable       hashTable ;
 
     private QueryIterator               iterStream ;
     private Binding                     rowStream       = null ;
@@ -61,40 +61,46 @@ public abstract class AbstractIterHashJoin extends 
QueryIter2 {
     protected AbstractIterHashJoin(JoinKey initialJoinKey, QueryIterator 
probeIter, QueryIterator streamIter, ExecutionContext execCxt) {
         super(probeIter, streamIter, execCxt) ;
 
-        if ( initialJoinKey == null ) {
-            // This block computes an initial join key from the common 
variables of each iterator's first binding.
-            QueryIterPeek pProbe = QueryIterPeek.create(probeIter, execCxt) ;
-            QueryIterPeek pStream = QueryIterPeek.create(streamIter, execCxt) ;
+        this.joinKey = initialJoinKey ;
+        this.iterStream = streamIter ;
+        this.iterCurrent = null ;
+    }
 
-            Binding bLeft = pProbe.peek() ;
-            Binding bRight = pStream.peek() ;
+    private void doInit() {
+        QueryIterator probeIter = getLeft();
+        try {
+            if ( joinKey == null ) {
+                // This block computes an initial join key from the common 
variables of each iterator's first binding.
+                ExecutionContext execCxt = getExecContext();
+                QueryIterPeek pProbe = QueryIterPeek.create(probeIter, 
execCxt) ;
+                probeIter = pProbe ;
 
-            List<Var> varsLeft = Iter.toList(bLeft.vars()) ;
-            List<Var> varsRight = Iter.toList(bRight.vars()) ;
+                QueryIterPeek pStream = QueryIterPeek.create(iterStream, 
execCxt) ;
+                this.iterStream = pStream ;
 
-            initialJoinKey = JoinKey.create(varsLeft, varsRight) ;
+                Binding bLeft = pProbe.peek() ;
+                Binding bRight = pStream.peek() ;
 
-            probeIter = pProbe ;
-            streamIter = pStream ;
-        }
+                List<Var> varsLeft = Iter.toList(bLeft.vars()) ;
+                List<Var> varsRight = Iter.toList(bRight.vars()) ;
 
-        JoinKey maxJoinKey = null;
+                joinKey = JoinKey.create(varsLeft, varsRight) ;
+            }
 
-        this.joinKey = initialJoinKey ;
-        this.iterStream = streamIter ;
-        this.hashTable = new MultiHashProbeTable(maxJoinKey, initialJoinKey) ;
-        this.iterCurrent = null ;
-        buildHashTable(probeIter) ;
+            JoinKey maxJoinKey = null;
+            this.hashTable = new MultiHashProbeTable(maxJoinKey, joinKey) ;
+            buildHashTable(probeIter) ;
+        } finally {
+            probeIter.close();
+        }
     }
 
     private void buildHashTable(QueryIterator iter1) {
         state = Phase.HASH ;
-        for (; iter1.hasNext();) {
-            Binding row1 = iter1.next() ;
+        iter1.forEachRemaining(row1 -> {
             s_countProbe ++ ;
             hashTable.put(row1) ;
-        }
-        iter1.close() ;
+        });
         state = Phase.STREAM ;
     }
 
@@ -127,8 +133,10 @@ public abstract class AbstractIterHashJoin extends 
QueryIter2 {
         switch ( state ) {
             case DONE : return null ;
             case HASH :
-            case INIT :
                 throw new IllegalStateException() ;
+            case INIT :
+                doInit();
+                break;
             case TRAILER :
                 return doOneTail() ;
             case STREAM :
diff --git 
a/jena-arq/src/main/java/org/apache/jena/sparql/engine/main/solver/StageMatchTriple.java
 
b/jena-arq/src/main/java/org/apache/jena/sparql/engine/main/solver/StageMatchTriple.java
index 58e59fab66..f46ca62c0b 100644
--- 
a/jena-arq/src/main/java/org/apache/jena/sparql/engine/main/solver/StageMatchTriple.java
+++ 
b/jena-arq/src/main/java/org/apache/jena/sparql/engine/main/solver/StageMatchTriple.java
@@ -20,12 +20,14 @@ package org.apache.jena.sparql.engine.main.solver;
 
 import java.util.Iterator;
 import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Predicate;
 
 import org.apache.jena.atlas.iterator.Iter;
 import org.apache.jena.graph.Graph;
 import org.apache.jena.graph.Node;
 import org.apache.jena.graph.Triple;
+import org.apache.jena.query.QueryCancelledException;
 import org.apache.jena.sparql.core.Var;
 import org.apache.jena.sparql.engine.ExecutionContext;
 import org.apache.jena.sparql.engine.binding.Binding;
@@ -61,6 +63,18 @@ public class StageMatchTriple {
         // ExtendedIterator<Triple> graphIter = graph.find(s2, p2, o2) ;
         // Language tags.
         ExtendedIterator<Triple> graphIter = G.findByLang(graph, s2, p2, o2);
+
+        // Add cancel.
+        AtomicBoolean cancelSignal = execCxt.getCancelSignal();
+        if (cancelSignal != null) {
+            graphIter = graphIter.mapWith(x -> {
+                if (cancelSignal.get()) {
+                    throw new QueryCancelledException();
+                }
+                return x;
+            });
+        }
+
         ExtendedIterator<Binding> iter = graphIter.mapWith( r -> 
mapper(resultsBuilder, s, p, o, r)).filterDrop(Objects::isNull);
         return iter;
     }
diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/RowSetOps.java 
b/jena-arq/src/main/java/org/apache/jena/sparql/exec/RowSetOps.java
index 8a4dad0765..06eb4c11c9 100644
--- a/jena-arq/src/main/java/org/apache/jena/sparql/exec/RowSetOps.java
+++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/RowSetOps.java
@@ -30,9 +30,9 @@ import org.apache.jena.sparql.ARQConstants;
 import org.apache.jena.sparql.core.Prologue;
 import org.apache.jena.sparql.resultset.ResultsWriter;
 
-/** 
+/**
  * RowSetOps - Convenience ways to call the various output formatters.
- * 
+ *
  * @see ResultSetMgr
  */
 
@@ -51,7 +51,7 @@ public class RowSetOps {
      * This operation consumes the RowSet.
      */
     public static long count(RowSet rowSet)
-    { return rowSet.rewindable().size(); }
+    { return rowSet.stream().count(); }
 
     /**
      * Output a result set in a text format.  The result set is consumed.
diff --git 
a/jena-arq/src/main/java/org/apache/jena/sparql/exec/RowSetStream.java 
b/jena-arq/src/main/java/org/apache/jena/sparql/exec/RowSetStream.java
index f4d2d7d6e4..f816de00ca 100644
--- a/jena-arq/src/main/java/org/apache/jena/sparql/exec/RowSetStream.java
+++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/RowSetStream.java
@@ -21,6 +21,7 @@ package org.apache.jena.sparql.exec;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.function.Consumer;
 
 import org.apache.jena.atlas.iterator.Iter;
 import org.apache.jena.sparql.core.Var;
@@ -72,4 +73,12 @@ public class RowSetStream implements RowSet {
     public void close() {
         Iter.close(iterator);
     }
+
+    @Override
+    public void forEachRemaining(Consumer<? super Binding> action) {
+        iterator.forEachRemaining(b -> {
+            ++rowNumber;
+            action.accept(b);
+        });
+    }
 }
diff --git 
a/jena-arq/src/test/java/org/apache/jena/sparql/api/TestQueryExecutionTimeout2.java
 
b/jena-arq/src/test/java/org/apache/jena/sparql/api/TestQueryExecutionTimeout2.java
index 7744ce2840..70b791d1f8 100644
--- 
a/jena-arq/src/test/java/org/apache/jena/sparql/api/TestQueryExecutionTimeout2.java
+++ 
b/jena-arq/src/test/java/org/apache/jena/sparql/api/TestQueryExecutionTimeout2.java
@@ -22,13 +22,30 @@ import static org.apache.jena.atlas.lib.Lib.sleep ;
 import static org.junit.Assume.assumeFalse;
 
 import java.util.concurrent.TimeUnit;
+import java.util.stream.LongStream;
 
 import org.apache.jena.base.Sys ;
 import org.apache.jena.graph.Graph ;
-import org.apache.jena.query.* ;
+import org.apache.jena.graph.NodeFactory;
+import org.apache.jena.graph.Triple;
+import org.apache.jena.graph.impl.GraphBase;
+import org.apache.jena.query.Dataset;
+import org.apache.jena.query.DatasetFactory;
+import org.apache.jena.query.QueryCancelledException;
+import org.apache.jena.query.QueryExecution;
+import org.apache.jena.query.ResultSet;
+import org.apache.jena.query.ResultSetFormatter;
 import org.apache.jena.sparql.core.DatasetGraph ;
 import org.apache.jena.sparql.core.DatasetGraphFactory ;
 import org.apache.jena.sparql.engine.binding.Binding ;
+import org.apache.jena.sparql.engine.join.AbstractIterHashJoin;
+import org.apache.jena.sparql.engine.main.solver.StageMatchTriple;
+import org.apache.jena.sparql.exec.QueryExec;
+import org.apache.jena.sparql.exec.QueryExecDataset;
+import org.apache.jena.sparql.exec.RowSetOps;
+import org.apache.jena.util.iterator.ExtendedIterator;
+import org.apache.jena.util.iterator.NiceIterator;
+import org.apache.jena.util.iterator.WrappedIterator;
 import org.junit.Assert ;
 import org.junit.Before;
 import org.junit.Test ;
@@ -110,5 +127,83 @@ public class TestQueryExecutionTimeout2
     private int timeout(int time1, int time2) {
         return mayBeErratic ? time2 : time1 ;
     }
+
+    /**
+     * Test case for GH-3044.
+     * {@link AbstractIterHashJoin} used to eagerly populate a hash probe 
table on iterator construction,
+     * while {@link QueryExecDataset} held a lock that prevented async abort 
while the iterator was being constructed.
+     */
+    @Test(timeout = 5000, expected = QueryCancelledException.class)
+    public void test_timeout_hashJoin() {
+        // A very large virtual graph.
+        Graph graph = new GraphBase() {
+            @Override
+            protected ExtendedIterator<Triple> graphBaseFind(Triple 
triplePattern) {
+                return WrappedIterator.createNoRemove(LongStream.range(0, 
Long.MAX_VALUE)
+                    .mapToObj(i -> 
NodeFactory.createURI("http://www.example.org/r"; + i))
+                    .peek(x -> {
+                        // Throttle binding generation to prevent going 
out-ouf-memory.
+                        // Bindings are likely to be stored in an in-memory 
hash probe table.
+                        try {
+                            Thread.sleep(1);
+                        } catch (InterruptedException e) {
+                            throw new RuntimeException(e);
+                        }
+                    })
+                    .map(i -> Triple.create(i, i, i))
+                    .filter(triplePattern::matches)
+                    .iterator());
+            }
+        };
+
+        try (QueryExec qe = QueryExec
+            .graph(graph)
+            .timeout(100, TimeUnit.MILLISECONDS)
+            .query("""
+            SELECT * {
+                ?a ?b ?c .
+                  # When this test case was written, a hash probe table was 
created for the rhs.
+                  {
+                    ?c ?d ?e .
+                  }
+                UNION
+                  { BIND('x' AS ?x) }
+            }
+            """).build()) {
+            RowSetOps.count(qe.select());
+        }
+    }
+
+    /** Test to ensure timeouts are considered in {@link StageMatchTriple} 
when producing only empty joins from a large set of triples. */
+    @Test(expected = QueryCancelledException.class)
+    public void test_timeout_stageMatchTriple() {
+        // A very large virtual graph that never matches a concrete subject.
+        Graph graph = new GraphBase() {
+            @Override
+            protected ExtendedIterator<Triple> graphBaseFind(Triple t) {
+                if (t.getMatchSubject() != null && 
t.getMatchSubject().isConcrete()) {
+                    // Don't match any concrete subject
+                    return NiceIterator.emptyIterator();
+                } else {
+                    // Generate :sI :p :oI triples
+                    return WrappedIterator.createNoRemove(LongStream.range(0, 
Long.MAX_VALUE)
+                        .mapToObj(i -> 
Triple.create(NodeFactory.createURI("http://www.example.org/s"; + i), 
t.getPredicate(), NodeFactory.createURI("http://www.example.org/o"; + i)))
+                        .iterator());
+                }
+            }
+        };
+
+        try (QueryExec qe = QueryExec
+            .graph(graph)
+            .timeout(100, TimeUnit.MILLISECONDS)
+            .query("""
+            SELECT * {
+                ?a <urn:p> ?c .
+                ?c <urn:p> ?e .
+            }
+            """).build()) {
+            RowSetOps.count(qe.select());
+        }
+    }
 }
 
diff --git 
a/jena-tdb1/src/main/java/org/apache/jena/tdb1/solver/StageMatchTuple.java 
b/jena-tdb1/src/main/java/org/apache/jena/tdb1/solver/StageMatchTuple.java
index 299187610f..2ab35b13f5 100644
--- a/jena-tdb1/src/main/java/org/apache/jena/tdb1/solver/StageMatchTuple.java
+++ b/jena-tdb1/src/main/java/org/apache/jena/tdb1/solver/StageMatchTuple.java
@@ -20,6 +20,7 @@ package org.apache.jena.tdb1.solver;
 
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
 import java.util.function.Predicate;
 
@@ -28,6 +29,7 @@ import org.apache.jena.atlas.lib.StrUtils;
 import org.apache.jena.atlas.lib.tuple.Tuple;
 import org.apache.jena.atlas.lib.tuple.TupleFactory;
 import org.apache.jena.graph.Node;
+import org.apache.jena.query.QueryCancelledException;
 import org.apache.jena.sparql.core.Var;
 import org.apache.jena.sparql.engine.ExecutionContext;
 import org.apache.jena.tdb1.store.NodeId;
@@ -68,6 +70,17 @@ class StageMatchTuple {
             iterMatches = x.iterator();
         }
 
+        // Add cancel check.
+        AtomicBoolean cancelSignal = execCxt.getCancelSignal();
+        if (cancelSignal != null) {
+            iterMatches = Iter.map(iterMatches, x -> {
+                 if (cancelSignal.get()) {
+                     throw new QueryCancelledException();
+                 }
+                 return x;
+            });
+        }
+
         // ** Allow a triple or quad filter here.
         if ( filter != null )
             iterMatches = Iter.filter(iterMatches, filter);
diff --git 
a/jena-tdb2/src/main/java/org/apache/jena/tdb2/solver/StageMatchTuple.java 
b/jena-tdb2/src/main/java/org/apache/jena/tdb2/solver/StageMatchTuple.java
index 0be831369c..3318b18390 100644
--- a/jena-tdb2/src/main/java/org/apache/jena/tdb2/solver/StageMatchTuple.java
+++ b/jena-tdb2/src/main/java/org/apache/jena/tdb2/solver/StageMatchTuple.java
@@ -20,6 +20,7 @@ package org.apache.jena.tdb2.solver;
 
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
 import java.util.function.Predicate;
 
@@ -28,6 +29,7 @@ import org.apache.jena.atlas.lib.StrUtils;
 import org.apache.jena.atlas.lib.tuple.Tuple;
 import org.apache.jena.atlas.lib.tuple.TupleFactory;
 import org.apache.jena.graph.Node;
+import org.apache.jena.query.QueryCancelledException;
 import org.apache.jena.sparql.core.Var;
 import org.apache.jena.sparql.engine.ExecutionContext;
 import org.apache.jena.tdb2.store.NodeId;
@@ -68,6 +70,17 @@ class StageMatchTuple {
             iterMatches = x.iterator();
         }
 
+        // Add cancel check.
+        AtomicBoolean cancelSignal = execCxt.getCancelSignal();
+        if (cancelSignal != null) {
+            iterMatches = Iter.map(iterMatches, x -> {
+                 if (cancelSignal.get()) {
+                     throw new QueryCancelledException();
+                 }
+                 return x;
+            });
+        }
+
         // ** Allow a triple or quad filter here.
         if ( filter != null )
             iterMatches = Iter.filter(iterMatches, filter);

Reply via email to