This is an automated email from the ASF dual-hosted git repository.
andy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/jena.git
The following commit(s) were added to refs/heads/main by this push:
new f6aa0ea7be GH-3535: Improved cancellation, more testing, possible fix
for DatasetText.abort.
f6aa0ea7be is described below
commit f6aa0ea7be9a59e525875b7a89e6c72fff201875
Author: Claus Stadler <[email protected]>
AuthorDate: Wed Nov 19 14:58:45 2025 +0100
GH-3535: Improved cancellation, more testing, possible fix for
DatasetText.abort.
Co-authored-by: Andy Seaborne <[email protected]>
---
.../apache/jena/query/QueryCancelledException.java | 1 +
.../jena/sparql/engine/iterator/IterAbortable.java | 68 +++--
.../sparql/engine/iterator/QueryIterFailed.java | 65 +++++
.../sparql/engine/iterator/QueryIteratorBase.java | 2 +-
.../sparql/engine/main/StageGeneratorGeneric.java | 21 +-
.../engine/main/solver/PatternMatchData.java | 4 +-
.../jena/sparql/engine/main/solver/SolverLib.java | 5 +-
.../apache/jena/sparql/exec/QueryExecDataset.java | 16 +-
.../src/main/java/org/apache/jena/system/G.java | 19 ++
.../jena/sparql/api/TestQueryExecutionCancel.java | 32 ++-
.../GenericGeometryPropertyFunction.java | 8 +-
.../geo/topological/GenericPropertyFunction.java | 26 +-
.../implementation/access/AccessGeoSPARQL.java | 65 +++--
.../implementation/access/AccessWGS84.java | 13 +-
.../geosparql/spatial/SpatialIndexFindUtils.java | 23 +-
.../geosparql/spatial/index/v2/STRtreeUtils.java | 2 +-
.../GenericSpatialPropertyFunction.java | 6 +-
.../exec/AbstractTestQueryExecutionCancel.java | 294 ++++++++++-----------
.../jena/sparql/exec/TS_QueryExecutionCancel.java | 16 +-
.../sparql/exec/TestQueryExecutionCancel_ARQ.java | 14 +-
.../sparql/exec/TestQueryExecutionCancel_TDB2.java | 14 +-
.../exec/TestQueryExecutionCancel_TDB2_Text.java | 69 +++++
.../apache/jena/tdb1/solver/OpExecutorTDB1.java | 23 +-
.../apache/jena/tdb1/solver/PatternMatchTDB1.java | 2 +-
.../org/apache/jena/tdb1/solver/SolverLibTDB.java | 7 +-
.../java/org/apache/jena/tdb1/solver/SolverRX.java | 7 +
.../apache/jena/tdb1/solver/StageMatchTuple.java | 11 +-
.../apache/jena/tdb2/solver/OpExecutorTDB2.java | 33 ++-
.../apache/jena/tdb2/solver/PatternMatchTDB2.java | 2 +-
.../org/apache/jena/tdb2/solver/SolverLibTDB.java | 5 +-
.../java/org/apache/jena/tdb2/solver/SolverRX.java | 7 +
.../apache/jena/tdb2/solver/StageMatchTuple.java | 11 +-
.../apache/jena/query/text/DatasetGraphText.java | 8 +-
33 files changed, 578 insertions(+), 321 deletions(-)
diff --git
a/jena-arq/src/main/java/org/apache/jena/query/QueryCancelledException.java
b/jena-arq/src/main/java/org/apache/jena/query/QueryCancelledException.java
index e1361cb11c..200c4a645a 100644
--- a/jena-arq/src/main/java/org/apache/jena/query/QueryCancelledException.java
+++ b/jena-arq/src/main/java/org/apache/jena/query/QueryCancelledException.java
@@ -22,4 +22,5 @@ package org.apache.jena.query;
public class QueryCancelledException extends QueryExecException
{
public QueryCancelledException() {}
+ public QueryCancelledException(Throwable cause) { super(cause) ; }
}
diff --git
a/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/IterAbortable.java
b/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/IterAbortable.java
index 45dd3ad52c..75870a5769 100644
---
a/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/IterAbortable.java
+++
b/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/IterAbortable.java
@@ -19,6 +19,8 @@
package org.apache.jena.sparql.engine.iterator;
import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
import org.apache.jena.atlas.iterator.IteratorCloseable;
import org.apache.jena.atlas.iterator.IteratorWrapper;
@@ -29,51 +31,69 @@ import org.apache.jena.query.QueryCancelledException;
* Iterator that adds an abort operation which can be called at any time,
* including from another thread, and causes the iterator to throw an exception
* when next touched (hasNext, next).
+ *
+ * The abort signal may be shared across a set of components.
+ * Hence, an abort on one component will be visible to all shared components.
*/
public class IterAbortable<T> extends IteratorWrapper<T> implements Abortable,
IteratorCloseable<T> {
- private volatile boolean abortFlag = false;
- private boolean haveAborted = false;
+ /**
+ * The cancel signal. Thread safe. Never null.
+ * May be set by external callers or from this instance.
+ */
+ private AtomicBoolean cancelSignal;
public IterAbortable(Iterator<T> iterator) {
+ this(iterator, null);
+ }
+
+ protected IterAbortable(Iterator<T> iterator, AtomicBoolean cancelSignal) {
super(iterator);
+ this.cancelSignal = (cancelSignal != null) ? cancelSignal : new
AtomicBoolean(false);
+ }
+
+ public static <T> IterAbortable<T> wrap(Iterator<T> it, AtomicBoolean
cancelSignal) {
+ return new IterAbortable<>(it, cancelSignal);
+ }
+
+ private boolean isAborted() {
+ return cancelSignal.get() || Thread.currentThread().isInterrupted();
+ }
+
+ private void checkAbort() {
+ if ( isAborted() ) {
+ throw new QueryCancelledException();
+ }
}
/** Can call asynchronously at any time */
@Override
public void abort() {
- abortFlag = true;
- }
-
- private void execAbort() {
- if ( ! haveAborted )
- close();
- haveAborted = true;
+ cancelSignal.set(true);
}
@Override
public boolean hasNext() {
- if ( abortFlag ) {
- execAbort();
- throw new QueryCancelledException();
- }
- return iterator.hasNext();
+ checkAbort();
+ return get().hasNext();
}
@Override
public T next() {
- if ( abortFlag ) {
- execAbort();
- throw new QueryCancelledException();
- }
- return iterator.next();
+ checkAbort();
+ return get().next();
}
@Override
public void remove() {
- if ( abortFlag ) {
- execAbort();
- throw new QueryCancelledException();
- }
- iterator.remove();
+ checkAbort();
+ get().remove();
+ }
+
+ @Override
+ public void forEachRemaining(Consumer<? super T> action) {
+ get().forEachRemaining(item -> {
+ checkAbort();
+ action.accept(item);
+ });
}
}
diff --git
a/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIterFailed.java
b/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIterFailed.java
new file mode 100644
index 0000000000..d8bc069c98
--- /dev/null
+++
b/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIterFailed.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.sparql.engine.iterator;
+
+import org.apache.jena.query.QueryCancelledException;
+import org.apache.jena.query.QueryExecException;
+import org.apache.jena.sparql.engine.ExecutionContext;
+import org.apache.jena.sparql.engine.QueryIterator;
+import org.apache.jena.sparql.engine.binding.Binding;
+
+/**
+ * A helper iterator for when an intermediate step in the construction of a
compound iterator fails.
+ */
+public class QueryIterFailed
+ extends QueryIter1
+{
+ private Throwable cause;
+
+ public QueryIterFailed(QueryIterator input, ExecutionContext execCxt,
Throwable cause) {
+ super(input, execCxt);
+ this.cause = cause;
+ }
+
+ @Override
+ protected void requestSubCancel() {
+ }
+
+ @Override
+ protected void closeSubIterator() {
+ }
+
+ @Override
+ protected boolean hasNextBinding() {
+ throw buildException();
+ }
+
+ @Override
+ protected Binding moveToNextBinding() {
+ throw buildException();
+ }
+
+ protected QueryExecException buildException() {
+ if (cause instanceof QueryCancelledException e) {
+ return new QueryCancelledException(e);
+ } else {
+ return new QueryExecException(cause);
+ }
+ }
+}
diff --git
a/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIteratorBase.java
b/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIteratorBase.java
index f156b065d2..f371da955c 100644
---
a/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIteratorBase.java
+++
b/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIteratorBase.java
@@ -70,7 +70,7 @@ public abstract class QueryIteratorBase
}
private boolean requestingCancel() {
- return (requestingCancel != null && requestingCancel.get()) ||
Thread.interrupted() ;
+ return (requestingCancel != null && requestingCancel.get()) ||
Thread.currentThread().isInterrupted() ;
}
private void haveCancelled() {}
diff --git
a/jena-arq/src/main/java/org/apache/jena/sparql/engine/main/StageGeneratorGeneric.java
b/jena-arq/src/main/java/org/apache/jena/sparql/engine/main/StageGeneratorGeneric.java
index a804fc235a..fff34c11b5 100644
---
a/jena-arq/src/main/java/org/apache/jena/sparql/engine/main/StageGeneratorGeneric.java
+++
b/jena-arq/src/main/java/org/apache/jena/sparql/engine/main/StageGeneratorGeneric.java
@@ -26,6 +26,7 @@ import org.apache.jena.sparql.core.Substitute ;
import org.apache.jena.sparql.engine.ExecutionContext ;
import org.apache.jena.sparql.engine.QueryIterator ;
import org.apache.jena.sparql.engine.binding.Binding ;
+import org.apache.jena.sparql.engine.iterator.QueryIterFailed;
import org.apache.jena.sparql.engine.iterator.QueryIterPeek ;
import org.apache.jena.sparql.engine.main.solver.PatternMatchData;
import org.apache.jena.sparql.engine.optimizer.reorder.ReorderLib ;
@@ -60,10 +61,7 @@ public class StageGeneratorGeneric implements StageGenerator
{
QueryIterator input, ExecutionContext
execCxt) {
Explain.explain(pattern, execCxt.getContext()) ;
- if ( ! input.hasNext() )
- return input ;
-
- if ( reorder != null && pattern.size() >= 2 ) {
+ if ( reorder != null && pattern.size() >= 2 ) {
// If pattern size is 0 or 1, nothing to do.
BasicPattern bgp2 = pattern ;
@@ -73,19 +71,26 @@ public class StageGeneratorGeneric implements
StageGenerator {
// And now use this one
input = peek ;
Binding b ;
+ // Eager access may fail e.g. due to timeout.
try {
b = peek.peek() ;
} catch (Exception e) {
- // Close peek iterator on failure e.g. due to cancellation.
- peek.close() ;
- e.addSuppressed(new RuntimeException("Error during
peek().")) ;
- throw e ;
+ return new QueryIterFailed(input, execCxt, e);
}
bgp2 = Substitute.substitute(pattern, b) ;
}
ReorderProc reorderProc = reorder.reorderIndexes(bgp2) ;
pattern = reorderProc.reorder(pattern) ;
+ } else {
+ // Eager access may fail e.g. due to timeout.
+ try {
+ if ( ! input.hasNext() )
+ return input ;
+ } catch (Exception e) {
+ return new QueryIterFailed(input, execCxt, e);
+ }
}
+
Explain.explain("Reorder/generic", pattern, execCxt.getContext()) ;
return PatternMatchData.execute(execCxt.getActiveGraph(), pattern,
input, null, execCxt) ;
}
diff --git
a/jena-arq/src/main/java/org/apache/jena/sparql/engine/main/solver/PatternMatchData.java
b/jena-arq/src/main/java/org/apache/jena/sparql/engine/main/solver/PatternMatchData.java
index 6ed1f1e64e..45b8b025b8 100644
---
a/jena-arq/src/main/java/org/apache/jena/sparql/engine/main/solver/PatternMatchData.java
+++
b/jena-arq/src/main/java/org/apache/jena/sparql/engine/main/solver/PatternMatchData.java
@@ -59,7 +59,7 @@ public class PatternMatchData {
// [Match] Missing filter.
chain = SolverRX3.rdfStarTriple(chain, triple, execCxt);
- chain = SolverLib.makeAbortable(chain, killList);
+ chain = SolverLib.makeAbortable(chain, killList,
execCxt.getCancelSignal());
}
// "input" will be closed by QueryIterAbortable but is otherwise
unused.
@@ -98,7 +98,7 @@ public class PatternMatchData {
// [Match] Missing filter.
chain = SolverRX4.rdfStarQuad(chain, graphNode, triple, execCxt);
- chain = SolverLib.makeAbortable(chain, killList);
+ chain = SolverLib.makeAbortable(chain, killList,
execCxt.getCancelSignal());
}
// "input" will be closed by QueryIterAbortable but is otherwise
unused.
diff --git
a/jena-arq/src/main/java/org/apache/jena/sparql/engine/main/solver/SolverLib.java
b/jena-arq/src/main/java/org/apache/jena/sparql/engine/main/solver/SolverLib.java
index a4ff36cfac..a6cc359093 100644
---
a/jena-arq/src/main/java/org/apache/jena/sparql/engine/main/solver/SolverLib.java
+++
b/jena-arq/src/main/java/org/apache/jena/sparql/engine/main/solver/SolverLib.java
@@ -20,6 +20,7 @@ package org.apache.jena.sparql.engine.main.solver;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.jena.graph.Node;
import org.apache.jena.graph.Triple;
@@ -35,10 +36,10 @@ public class SolverLib {
* Create an abortable iterator, storing it in the killList.
* Just return the input iterator if killList is null.
*/
- public static <T> Iterator<T> makeAbortable(Iterator<T> iter,
List<Abortable> killList) {
+ public static <T> Iterator<T> makeAbortable(Iterator<T> iter,
List<Abortable> killList, AtomicBoolean cancel) {
if ( killList == null )
return iter;
- IterAbortable<T> k = new IterAbortable<>(iter);
+ IterAbortable<T> k = IterAbortable.wrap(iter, cancel);
killList.add(k);
return k;
}
diff --git
a/jena-arq/src/main/java/org/apache/jena/sparql/exec/QueryExecDataset.java
b/jena-arq/src/main/java/org/apache/jena/sparql/exec/QueryExecDataset.java
index 32fdd21407..35a883345d 100644
--- a/jena-arq/src/main/java/org/apache/jena/sparql/exec/QueryExecDataset.java
+++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/QueryExecDataset.java
@@ -378,15 +378,13 @@ public class QueryExecDataset implements QueryExec
class TimeoutCallback implements Runnable {
@Override
public void run() {
- synchronized (lockTimeout) {
- // Abort query if and only if we are the expected callback.
- // If the first row has appeared, and we are removing timeout1
- // callback, it still may go off so it needs to check here
- // it's still wanted.
- if ( expectedCallback.get() == this ) {
- if ( cancelSignal != null )
- cancelSignal.set(true);
- }
+ // Abort query if and only if we are the expected callback.
+ // If the first row has appeared, and we are removing timeout1
+ // callback, it still may go off so it needs to check here
+ // it's still wanted.
+ if ( expectedCallback.get() == this ) {
+ if ( cancelSignal != null )
+ cancelSignal.set(true);
}
}
}
diff --git a/jena-arq/src/main/java/org/apache/jena/system/G.java
b/jena-arq/src/main/java/org/apache/jena/system/G.java
index 5c4c15b1c9..3e6ca18fa0 100644
--- a/jena-arq/src/main/java/org/apache/jena/system/G.java
+++ b/jena-arq/src/main/java/org/apache/jena/system/G.java
@@ -21,6 +21,7 @@ package org.apache.jena.system;
import static org.apache.jena.graph.Node.ANY;
import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
@@ -34,10 +35,12 @@ import org.apache.jena.rdf.model.impl.Util;
import org.apache.jena.riot.out.NodeFmtLib;
import org.apache.jena.sparql.core.DatasetGraph;
import org.apache.jena.sparql.core.Quad;
+import org.apache.jena.sparql.engine.iterator.IterAbortable;
import org.apache.jena.sparql.graph.NodeConst;
import org.apache.jena.sparql.util.graph.GNode;
import org.apache.jena.sparql.util.graph.GraphList;
import org.apache.jena.util.iterator.ExtendedIterator;
+import org.apache.jena.util.iterator.WrappedIterator;
/**
* A library of functions for working with {@link Graph graphs}. Internally,
all
@@ -1131,4 +1134,20 @@ public class G {
return false ;
return true ;
}
+
+ // --- Cancellable variants ---
+
+ /**
+ * Cancellable variant of {@link Graph#find(Node, Node, Node)}.
+ * Wraps the iterator returned by {@code graph.find()} with the provided
cancel signal.
+ * If the cancel signal is null then no wrapping is applied.
+ */
+ public static ExtendedIterator<Triple> find(AtomicBoolean cancel, Graph
graph, Node subject, Node predicate, Node object) {
+ Objects.requireNonNull(graph, "graph");
+ ExtendedIterator<Triple> it = graph.find(subject, predicate, object);
+ if (cancel == null) {
+ return it;
+ }
+ return WrappedIterator.create(IterAbortable.wrap(it, cancel));
+ }
}
diff --git
a/jena-arq/src/test/java/org/apache/jena/sparql/api/TestQueryExecutionCancel.java
b/jena-arq/src/test/java/org/apache/jena/sparql/api/TestQueryExecutionCancel.java
index 652465dcab..80a42dad64 100644
---
a/jena-arq/src/test/java/org/apache/jena/sparql/api/TestQueryExecutionCancel.java
+++
b/jena-arq/src/test/java/org/apache/jena/sparql/api/TestQueryExecutionCancel.java
@@ -82,13 +82,13 @@ public class TestQueryExecutionCancel {
@Test
public void test_Cancel_API_1()
{
- try(QueryExecution qExec = makeQExec("SELECT * {?s ?p ?o}")) {
+ try(QueryExecution qExec = makeQExec("SELECT * {?s ?p ?o}")) {
ResultSet rs = qExec.execSelect();
assertTrue(rs.hasNext());
qExec.abort();
- assertThrows(QueryCancelledException.class,
- ()-> rs.nextSolution(),
- ()->"Results not expected
after cancel.");
+ assertThrows(QueryCancelledException.class,
+ ()-> rs.nextSolution(),
+ ()->"Results not expected after cancel.");
}
}
@@ -100,8 +100,8 @@ public class TestQueryExecutionCancel {
assertTrue(rs.hasNext());
qExec.abort();
assertThrows(QueryCancelledException.class,
- ()-> rs.hasNext(),
- ()->"Results not expected
after cancel.");
+ ()-> rs.hasNext(),
+ ()->"Results not expected after cancel.");
}
}
@@ -133,7 +133,7 @@ public class TestQueryExecutionCancel {
public void test_Cancel_API_5() {
try (QueryExecution qe = QueryExecutionFactory.create("SELECT * { ?s
?p ?o }", m)) {
qe.abort();
- assertThrows(QueryCancelledException.class, ()->
ResultSetFormatter.consume(qe.execSelect()));
+ assertThrows(QueryCancelledException.class, ()->
ResultSetFormatter.consume(qe.execSelect()));
}
}
@@ -346,6 +346,24 @@ public class TestQueryExecutionCancel {
test_cancel_concurrent("SELECT * { { ?a ?b ?c . ?d ?e ?f . ?g ?h ?i .
} UNION { BIND('x' AS ?x) } }");
}
+ @Test
+ @Timeout(value = 10000, unit=TimeUnit.MILLISECONDS)
+ public void test_cancel_concurrent_3() {
+ test_cancel_concurrent("""
+ SELECT * {
+ ?s ?p ?o
+ {
+ SELECT * {
+ ?s ?p ?o
+ }
+ LIMIT 1
+ }
+ ?s ?p ?o
+ }
+ LIMIT 1
+ """);
+ }
+
private static void test_cancel_concurrent(String queryString) {
int maxCancelDelayInMillis = 100;
diff --git
a/jena-geosparql/src/main/java/org/apache/jena/geosparql/geo/topological/GenericGeometryPropertyFunction.java
b/jena-geosparql/src/main/java/org/apache/jena/geosparql/geo/topological/GenericGeometryPropertyFunction.java
index 91d5b4f019..dbce73a631 100644
---
a/jena-geosparql/src/main/java/org/apache/jena/geosparql/geo/topological/GenericGeometryPropertyFunction.java
+++
b/jena-geosparql/src/main/java/org/apache/jena/geosparql/geo/topological/GenericGeometryPropertyFunction.java
@@ -17,6 +17,8 @@
*/
package org.apache.jena.geosparql.geo.topological;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import org.apache.jena.datatypes.DatatypeFormatException;
import org.apache.jena.geosparql.implementation.GeometryWrapper;
import org.apache.jena.geosparql.implementation.access.AccessGeoSPARQL;
@@ -103,8 +105,9 @@ public abstract class GenericGeometryPropertyFunction
extends PFuncSimple {
private QueryIterator subjectUnbound(Binding binding, Node subject, Node
predicate, Node object, ExecutionContext execCxt) {
Graph graph = execCxt.getActiveGraph();
+ AtomicBoolean cancel = execCxt.getCancelSignal();
- ExtendedIterator<Triple> subjectTriples =
AccessGeoSPARQL.findSpecificGeoLiterals(graph);
+ ExtendedIterator<Triple> subjectTriples =
AccessGeoSPARQL.findSpecificGeoLiterals(cancel, graph);
Var subjectVar = Var.alloc(subject.getName());
ExtendedIterator<Binding> iterator = subjectTriples
.mapWith(Triple::getSubject)
@@ -131,8 +134,9 @@ public abstract class GenericGeometryPropertyFunction
extends PFuncSimple {
private QueryIterator bothUnbound(Binding binding, Node subject, Node
predicate, Node object, ExecutionContext execCxt) {
Graph graph = execCxt.getActiveGraph();
+ AtomicBoolean cancel = execCxt.getCancelSignal();
- ExtendedIterator<Triple> subjectTriples =
AccessGeoSPARQL.findSpecificGeoLiterals(graph);
+ ExtendedIterator<Triple> subjectTriples =
AccessGeoSPARQL.findSpecificGeoLiterals(cancel, graph);
Var subjectVar = Var.alloc(subject.getName());
ExtendedIterator<Binding> iterator = subjectTriples
.mapWith(Triple::getSubject)
diff --git
a/jena-geosparql/src/main/java/org/apache/jena/geosparql/geo/topological/GenericPropertyFunction.java
b/jena-geosparql/src/main/java/org/apache/jena/geosparql/geo/topological/GenericPropertyFunction.java
index 9b913abfc9..245264d3c7 100644
---
a/jena-geosparql/src/main/java/org/apache/jena/geosparql/geo/topological/GenericPropertyFunction.java
+++
b/jena-geosparql/src/main/java/org/apache/jena/geosparql/geo/topological/GenericPropertyFunction.java
@@ -19,6 +19,7 @@ package org.apache.jena.geosparql.geo.topological;
import java.util.Collection;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.jena.atlas.iterator.Iter;
import org.apache.jena.geosparql.geof.topological.GenericFilterFunction;
@@ -112,9 +113,10 @@ public abstract class GenericPropertyFunction extends
PFuncSimple {
private QueryIterator bothUnbound(Binding binding, Node subject, Node
predicate, Node object, ExecutionContext execCxt, SpatialIndex spatialIndex,
QueryRewriteIndex queryRewriteIndex) {
Var subjectVar = Var.alloc(subject.getName());
Graph graph = execCxt.getActiveGraph();
+ AtomicBoolean cancel = execCxt.getCancelSignal();
//Search for both Features and Geometry in the Graph. Reliant upon
consistent usage of SpatialObject (which is base class of Feature and Geometry)
if present.
- ExtendedIterator<Binding> iterator = findSpatialObjects(graph)
+ ExtendedIterator<Binding> iterator = findSpatialObjects(cancel, graph)
.mapWith(node -> BindingFactory.binding(binding, subjectVar,
node));
QueryIter queryIter = QueryIter.flatMap(
@@ -155,8 +157,9 @@ public abstract class GenericPropertyFunction extends
PFuncSimple {
//Prepare the results.
Var unboundVar = Var.alloc(unboundNode.getName());
+ AtomicBoolean cancel = execCxt.getCancelSignal();
//Search for both Features and Geometry in the Graph. Reliant upon
consistent usage of SpatialObject (which is base class of Feature and Geometry)
if present.
- ExtendedIterator<Binding> iterator = findSpatialObjects(graph)
+ ExtendedIterator<Binding> iterator = findSpatialObjects(cancel, graph)
.mapWith(node -> BindingFactory.binding(binding, unboundVar,
node));
return QueryIter.flatMap(
@@ -169,18 +172,18 @@ public abstract class GenericPropertyFunction extends
PFuncSimple {
execCxt);
}
- private static ExtendedIterator<Node> findSpatialObjects(Graph graph) {
+ private static ExtendedIterator<Node> findSpatialObjects(AtomicBoolean
cancel, Graph graph) {
// The found nodes are passed to SpatialObjectGeometryLiteral.retrieve
which:
// - Filters out all features unless they have a
geo:hasDefaultGeometry property or wgs84 vocab.
// - Retrieves only a single specific geoLiteral for a geoResource
// There would be performance potential by leveraging the triples here
for retrieve.
- ExtendedIterator<Triple> result =
AccessGeoSPARQL.findSpecificGeoLiterals(graph);
+ ExtendedIterator<Triple> result =
AccessGeoSPARQL.findSpecificGeoLiterals(cancel, graph);
try {
- result =
result.andThen(AccessGeoSPARQL.findDefaultGeoResources(graph));
- result =
result.andThen(AccessWGS84.findGeoLiteralsAsTriples(graph, null));
+ result =
result.andThen(AccessGeoSPARQL.findDefaultGeoResources(cancel, graph));
+ result =
result.andThen(AccessWGS84.findGeoLiteralsAsTriples(cancel, graph, null));
} catch (RuntimeException t) {
result.close();
- throw new RuntimeException(t);
+ throw AccessGeoSPARQL.buildException(t);
}
return result.mapWith(Triple::getSubject);
}
@@ -254,7 +257,14 @@ public abstract class GenericPropertyFunction extends
PFuncSimple {
// Also test all Geometry of the Features. All, some or one Geometry
may have matched.
// ExtendedIterator<Node> featureGeometries = G.iterSP(graph,
featureNode, Geo.HAS_GEOMETRY_NODE);
- ExtendedIterator<Node> featureGeometries =
AccessGeoSPARQL.findSpecificGeoResources(graph,
featureNode).mapWith(Triple::getObject);
+ AtomicBoolean cancel = execCxt.getCancelSignal();
+ ExtendedIterator<Node> featureGeometries;
+ try {
+ featureGeometries =
AccessGeoSPARQL.findSpecificGeoResources(cancel, graph,
featureNode).mapWith(Triple::getObject);
+ } catch (RuntimeException e) {
+ featureIterConcat.close();
+ throw AccessGeoSPARQL.buildException(e);
+ }
QueryIterator geometriesQueryIterator = QueryIterPlainWrapper.create(
Iter.map(
Iter.filter( // omit asserted
diff --git
a/jena-geosparql/src/main/java/org/apache/jena/geosparql/implementation/access/AccessGeoSPARQL.java
b/jena-geosparql/src/main/java/org/apache/jena/geosparql/implementation/access/AccessGeoSPARQL.java
index fd6bfceaf4..45ad27eddf 100644
---
a/jena-geosparql/src/main/java/org/apache/jena/geosparql/implementation/access/AccessGeoSPARQL.java
+++
b/jena-geosparql/src/main/java/org/apache/jena/geosparql/implementation/access/AccessGeoSPARQL.java
@@ -20,12 +20,14 @@ package org.apache.jena.geosparql.implementation.access;
import java.util.Iterator;
import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.jena.atlas.iterator.Iter;
import org.apache.jena.geosparql.implementation.vocabulary.Geo;
import org.apache.jena.graph.Graph;
import org.apache.jena.graph.Node;
import org.apache.jena.graph.Triple;
+import org.apache.jena.query.QueryCancelledException;
import org.apache.jena.system.G;
import org.apache.jena.util.iterator.ExtendedIterator;
@@ -86,40 +88,40 @@ public class AccessGeoSPARQL {
* Find all triples with geo:hasDefaultGeometry and geo:hasGeometry
predicates.
* If a feature has a default geometry, then this method will omit all its
(non-default) geometries.
*/
- public static ExtendedIterator<Triple> findSpecificGeoResources(Graph
graph) {
+ public static ExtendedIterator<Triple>
findSpecificGeoResources(AtomicBoolean cancel, Graph graph) {
// List resources that have a default geometry followed by those that
// only have a non-default one.
- ExtendedIterator<Triple> result = graph.find(null,
Geo.HAS_DEFAULT_GEOMETRY_NODE, null);
+ ExtendedIterator<Triple> result = G.find(cancel, graph, null,
Geo.HAS_DEFAULT_GEOMETRY_NODE, null);
try {
boolean hasDefaultGeometry = result.hasNext();
- ExtendedIterator<Triple> it = graph.find(null,
Geo.HAS_GEOMETRY_NODE, null);
+ ExtendedIterator<Triple> it = G.find(cancel, graph, null,
Geo.HAS_GEOMETRY_NODE, null);
// No default geometry -> no need to filter.
result = hasDefaultGeometry
? result.andThen(it.filterDrop(t -> G.hasProperty(graph,
t.getSubject(), Geo.HAS_DEFAULT_GEOMETRY_NODE)))
: result.andThen(it);
- } catch (RuntimeException t) {
+ } catch (RuntimeException e) {
result.close();
- throw new RuntimeException(t);
+ throw buildException(e);
}
return result;
}
- public static ExtendedIterator<Triple> findDefaultGeoResources(Graph
graph) {
- return graph.find(null, Geo.HAS_DEFAULT_GEOMETRY_NODE, null);
+ public static ExtendedIterator<Triple>
findDefaultGeoResources(AtomicBoolean cancel, Graph graph) {
+ return G.find(cancel, graph, null, Geo.HAS_DEFAULT_GEOMETRY_NODE,
null);
}
- public static ExtendedIterator<Triple> findSpecificGeoResources(Graph
graph, Node feature) {
+ public static ExtendedIterator<Triple>
findSpecificGeoResources(AtomicBoolean cancel, Graph graph, Node feature) {
Objects.requireNonNull(feature);
- ExtendedIterator<Triple> result = graph.find(feature,
Geo.HAS_DEFAULT_GEOMETRY_NODE, null);
+ ExtendedIterator<Triple> result = G.find(cancel, graph, feature,
Geo.HAS_DEFAULT_GEOMETRY_NODE, null);
try {
if (!result.hasNext()) {
result.close();
}
- result = graph.find(feature, Geo.HAS_GEOMETRY_NODE, null);
- } catch (RuntimeException t) {
+ result = G.find(cancel, graph, feature, Geo.HAS_GEOMETRY_NODE,
null);
+ } catch (RuntimeException e) {
result.close();
- throw new RuntimeException(t);
+ throw buildException(e);
}
return result;
}
@@ -133,9 +135,9 @@ public class AccessGeoSPARQL {
*
* If a geo:hasDefaultGeometry does not lead to a valid geo-literal there
is no backtracking to geo:hasGeometry.
*/
- public static Iterator<Triple> findSpecificGeoLiteralsByFeature(Graph
graph, Node feature) {
- return Iter.flatMap(findSpecificGeoResources(graph, feature),
- t -> findSpecificGeoLiterals(graph, t.getObject()));
+ public static Iterator<Triple>
findSpecificGeoLiteralsByFeature(AtomicBoolean cancel, Graph graph, Node
feature) {
+ return Iter.flatMap(findSpecificGeoResources(cancel, graph, feature),
+ t -> findSpecificGeoLiterals(cancel, graph, t.getObject()));
}
/**
@@ -143,24 +145,24 @@ public class AccessGeoSPARQL {
* The specific properties geo:asWKT and geo:asGML take precedence over
the more general geo:hasSerialization.
* This means if a resource has wkt and/or gml then all
geo:hasSerialization triples will be omitted for it.
*/
- public static ExtendedIterator<Triple> findSpecificGeoLiterals(Graph
graph) {
- ExtendedIterator<Triple> result = graph.find(null, Geo.AS_WKT_NODE,
null);
+ public static ExtendedIterator<Triple>
findSpecificGeoLiterals(AtomicBoolean cancel, Graph graph) {
+ ExtendedIterator<Triple> result = G.find(cancel, graph, null,
Geo.AS_WKT_NODE, null);
try {
- result = result.andThen(graph.find(null, Geo.AS_GML_NODE, null));
+ result = result.andThen(G.find(cancel, graph, null,
Geo.AS_GML_NODE, null));
// If there is no specific serialization property use the general
one.
if (!result.hasNext()) {
result.close();
- result = graph.find(null, Geo.HAS_SERIALIZATION_NODE, null);
+ result = G.find(cancel, graph, null,
Geo.HAS_SERIALIZATION_NODE, null);
} else {
// Append more general serializations for those resources that
lack a specific one.
- ExtendedIterator<Triple> it = graph.find(null,
Geo.HAS_SERIALIZATION_NODE, null).filterDrop(t ->
+ ExtendedIterator<Triple> it = G.find(cancel, graph, null,
Geo.HAS_SERIALIZATION_NODE, null).filterDrop(t ->
G.hasProperty(graph, t.getSubject(), Geo.AS_WKT_NODE) ||
G.hasProperty(graph, t.getSubject(), Geo.AS_GML_NODE));
result = result.andThen(it);
}
- } catch (RuntimeException t) {
+ } catch (RuntimeException e) {
result.close();
- throw new RuntimeException(t);
+ throw buildException(e);
}
return result;
}
@@ -170,19 +172,19 @@ public class AccessGeoSPARQL {
* The geometry resource must not be null.
* A specific serialization (WKT, GML) takes precedence over the more
general hasSerialization property.
*/
- public static ExtendedIterator<Triple> findSpecificGeoLiterals(Graph
graph, Node geometry) {
+ public static ExtendedIterator<Triple>
findSpecificGeoLiterals(AtomicBoolean cancel, Graph graph, Node geometry) {
Objects.requireNonNull(geometry);
- ExtendedIterator<Triple> result = graph.find(geometry,
Geo.AS_WKT_NODE, null);
+ ExtendedIterator<Triple> result = G.find(cancel, graph, geometry,
Geo.AS_WKT_NODE, null);
try {
- result = result.andThen(graph.find(geometry, Geo.AS_GML_NODE,
null));
+ result = result.andThen(G.find(cancel, graph, geometry,
Geo.AS_GML_NODE, null));
if (!result.hasNext()) {
result.close();
// Fallback to the more generic property.
- result = graph.find(geometry, Geo.HAS_SERIALIZATION_NODE,
null);
+ result = G.find(cancel, graph, geometry,
Geo.HAS_SERIALIZATION_NODE, null);
}
- } catch (RuntimeException t) {
+ } catch (RuntimeException e) {
result.close();
- throw new RuntimeException(t);
+ throw buildException(e);
}
return result;
}
@@ -229,4 +231,11 @@ public class AccessGeoSPARQL {
graph.contains(node, Geo.AS_GML_NODE, null);
return result;
}
+
+ public static RuntimeException buildException(RuntimeException e) {
+ if (e instanceof QueryCancelledException e2) {
+ return new QueryCancelledException(e2);
+ }
+ return new RuntimeException(e);
+ }
}
diff --git
a/jena-geosparql/src/main/java/org/apache/jena/geosparql/implementation/access/AccessWGS84.java
b/jena-geosparql/src/main/java/org/apache/jena/geosparql/implementation/access/AccessWGS84.java
index b049c77bd8..df4b130846 100644
---
a/jena-geosparql/src/main/java/org/apache/jena/geosparql/implementation/access/AccessWGS84.java
+++
b/jena-geosparql/src/main/java/org/apache/jena/geosparql/implementation/access/AccessWGS84.java
@@ -22,6 +22,7 @@ import java.lang.invoke.MethodHandles;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.jena.atlas.iterator.Iter;
import org.apache.jena.datatypes.DatatypeFormatException;
@@ -60,8 +61,8 @@ public class AccessWGS84 {
/** For each matching resource, build triples of format 's geo:hasGeometry
geometryLiteral'. */
// XXX geo:hasSerialization might seem a better choice but the original
jena-geosparql implementation used geo:hasGeometry.
- public static ExtendedIterator<Triple> findGeoLiteralsAsTriples(Graph
graph, Node s) {
- return findGeoLiteralsAsTriples(graph, s, Geo.HAS_GEOMETRY_NODE);
+ public static ExtendedIterator<Triple>
findGeoLiteralsAsTriples(AtomicBoolean cancel, Graph graph, Node s) {
+ return findGeoLiteralsAsTriples(cancel, graph, s,
Geo.HAS_GEOMETRY_NODE);
}
/**
@@ -72,19 +73,19 @@ public class AccessWGS84 {
* @param p The predicate to use for creating triples. Can be chosen
freely but must not be null.
* @return Iterator of created triples (not obtained from the graph
directly).
*/
- public static ExtendedIterator<Triple> findGeoLiteralsAsTriples(Graph
graph, Node s, Node p) {
- return findGeoLiterals(graph, s).mapWith(e ->
Triple.create(e.getKey(), p, e.getValue().asNode()));
+ public static ExtendedIterator<Triple>
findGeoLiteralsAsTriples(AtomicBoolean cancel, Graph graph, Node s, Node p) {
+ return findGeoLiterals(cancel, graph, s).mapWith(e ->
Triple.create(e.getKey(), p, e.getValue().asNode()));
}
/**
* For each matching resource, build geometry literals from the cartesian
product of the WGS84 lat/long properties.
* Resources must have both properties, lat and long, to be matched by
this method.
*/
- public static ExtendedIterator<Entry<Node, GeometryWrapper>>
findGeoLiterals(Graph graph, Node s) {
+ public static ExtendedIterator<Entry<Node, GeometryWrapper>>
findGeoLiterals(AtomicBoolean cancel, Graph graph, Node s) {
// Warn about multiple lat/lon combinations only at most once per
graph.
boolean enableWarnings = false;
boolean[] loggedMultipleLatLons = { false };
- ExtendedIterator<Triple> latIt = graph.find(s,
SpatialExtension.GEO_LAT_NODE, Node.ANY);
+ ExtendedIterator<Triple> latIt = G.find(cancel, graph, s,
SpatialExtension.GEO_LAT_NODE, Node.ANY);
ExtendedIterator<Entry<Node, GeometryWrapper>> result =
WrappedIterator.create(Iter.iter(latIt).flatMap(triple -> {
Node feature = triple.getSubject();
Node lat = triple.getObject();
diff --git
a/jena-geosparql/src/main/java/org/apache/jena/geosparql/spatial/SpatialIndexFindUtils.java
b/jena-geosparql/src/main/java/org/apache/jena/geosparql/spatial/SpatialIndexFindUtils.java
index c2264faad4..082b02d609 100644
---
a/jena-geosparql/src/main/java/org/apache/jena/geosparql/spatial/SpatialIndexFindUtils.java
+++
b/jena-geosparql/src/main/java/org/apache/jena/geosparql/spatial/SpatialIndexFindUtils.java
@@ -18,6 +18,7 @@
package org.apache.jena.geosparql.spatial;
import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.jena.atlas.iterator.Iter;
import org.apache.jena.atlas.iterator.IteratorCloseable;
@@ -41,15 +42,15 @@ public class SpatialIndexFindUtils {
* @param srsURI
* @return SpatialIndexItems found.
*/
- public static IteratorCloseable<SpatialIndexItem>
findIndexItems(DatasetGraph datasetGraph, String srsURI) {
+ public static IteratorCloseable<SpatialIndexItem>
findIndexItems(AtomicBoolean cancel, DatasetGraph datasetGraph, String srsURI) {
Graph defaultGraph = datasetGraph.getDefaultGraph();
- IteratorCloseable<SpatialIndexItem> itemsIter =
findIndexItems(defaultGraph, srsURI);
+ IteratorCloseable<SpatialIndexItem> itemsIter = findIndexItems(cancel,
defaultGraph, srsURI);
try {
//Named Models
Iterator<Node> graphNodeIt = datasetGraph.listGraphNodes();
Iterator<SpatialIndexItem> namedGraphItemsIt =
Iter.iter(graphNodeIt).flatMap(graphNode -> {
Graph namedGraph = datasetGraph.getGraph(graphNode);
- IteratorCloseable<SpatialIndexItem> graphItems =
findIndexItems(namedGraph, srsURI);
+ IteratorCloseable<SpatialIndexItem> graphItems =
findIndexItems(cancel, namedGraph, srsURI);
return graphItems;
});
itemsIter = Iter.iter(itemsIter).append(namedGraphItemsIt);
@@ -67,13 +68,13 @@ public class SpatialIndexFindUtils {
* @param srsURI
* @return Items found in the Model in the SRS URI.
*/
- public static final IteratorCloseable<SpatialIndexItem>
findIndexItems(Graph graph, String srsURI) {
+ public static final IteratorCloseable<SpatialIndexItem>
findIndexItems(AtomicBoolean cancel, Graph graph, String srsURI) {
IteratorCloseable<SpatialIndexItem> result;
// Only add one set of statements as a converted dataset will
duplicate the same info.
if (AccessGeoSPARQL.containsGeoLiterals(graph)) {
- result = findIndexItemsGeoSparql(graph, srsURI);
+ result = findIndexItemsGeoSparql(cancel, graph, srsURI);
} else if (AccessWGS84.containsGeoLiteralProperties(graph)) {
- result = findIndexItemsWgs84(graph, srsURI);
+ result = findIndexItemsWgs84(cancel, graph, srsURI);
} else {
result = Iter.empty();
}
@@ -86,12 +87,12 @@ public class SpatialIndexFindUtils {
* @param srsURI
* @return SpatialIndexItem items prepared for adding to SpatialIndex.
*/
- public static IteratorCloseable<SpatialIndexItem>
findIndexItemsGeoSparql(Graph graph, String srsURI) {
- Iterator<Triple> stmtIter =
AccessGeoSPARQL.findSpecificGeoResources(graph);
+ public static IteratorCloseable<SpatialIndexItem>
findIndexItemsGeoSparql(AtomicBoolean cancel, Graph graph, String srsURI) {
+ Iterator<Triple> stmtIter =
AccessGeoSPARQL.findSpecificGeoResources(cancel, graph);
IteratorCloseable<SpatialIndexItem> result =
Iter.iter(stmtIter).flatMap(stmt -> {
Node feature = stmt.getSubject();
Node geometry = stmt.getObject();
- Iterator<Triple> serializationIter =
AccessGeoSPARQL.findSpecificGeoLiterals(graph, geometry);
+ Iterator<Triple> serializationIter =
AccessGeoSPARQL.findSpecificGeoLiterals(cancel, graph, geometry);
Iterator<SpatialIndexItem> itemIter = Iter.map(serializationIter,
triple -> {
Node geometryNode = triple.getObject();
GeometryWrapper geometryWrapper =
GeometryWrapper.extract(geometryNode);
@@ -110,8 +111,8 @@ public class SpatialIndexFindUtils {
* @param srsURI
* @return Geo predicate objects prepared for adding to SpatialIndex.
*/
- public static IteratorCloseable<SpatialIndexItem>
findIndexItemsWgs84(Graph graph, String srsURI) {
- return Iter.iter(AccessWGS84.findGeoLiterals(graph, null)).map(e -> {
+ public static IteratorCloseable<SpatialIndexItem>
findIndexItemsWgs84(AtomicBoolean cancel, Graph graph, String srsURI) {
+ return Iter.iter(AccessWGS84.findGeoLiterals(cancel, graph,
null)).map(e -> {
Node feature = e.getKey();
GeometryWrapper geometryWrapper = e.getValue();
SpatialIndexItem item = makeSpatialIndexItem(feature,
geometryWrapper, srsURI);
diff --git
a/jena-geosparql/src/main/java/org/apache/jena/geosparql/spatial/index/v2/STRtreeUtils.java
b/jena-geosparql/src/main/java/org/apache/jena/geosparql/spatial/index/v2/STRtreeUtils.java
index 15e40b2754..d43bcb8c86 100644
---
a/jena-geosparql/src/main/java/org/apache/jena/geosparql/spatial/index/v2/STRtreeUtils.java
+++
b/jena-geosparql/src/main/java/org/apache/jena/geosparql/spatial/index/v2/STRtreeUtils.java
@@ -40,7 +40,7 @@ public class STRtreeUtils {
public static STRtree buildSpatialIndexTree(Graph graph, String srsURI)
throws SpatialIndexException {
try {
STRtree tree;
- IteratorCloseable<SpatialIndexItem> it =
SpatialIndexFindUtils.findIndexItems(graph, srsURI);
+ IteratorCloseable<SpatialIndexItem> it =
SpatialIndexFindUtils.findIndexItems(null, graph, srsURI);
try {
tree = buildSpatialIndexTree(it);
} finally {
diff --git
a/jena-geosparql/src/main/java/org/apache/jena/geosparql/spatial/property_functions/GenericSpatialPropertyFunction.java
b/jena-geosparql/src/main/java/org/apache/jena/geosparql/spatial/property_functions/GenericSpatialPropertyFunction.java
index a932ffc43c..18d47204d2 100644
---
a/jena-geosparql/src/main/java/org/apache/jena/geosparql/spatial/property_functions/GenericSpatialPropertyFunction.java
+++
b/jena-geosparql/src/main/java/org/apache/jena/geosparql/spatial/property_functions/GenericSpatialPropertyFunction.java
@@ -19,6 +19,7 @@ package org.apache.jena.geosparql.spatial.property_functions;
import java.util.Collection;
import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
import org.apache.jena.atlas.iterator.Iter;
@@ -101,6 +102,7 @@ public abstract class GenericSpatialPropertyFunction
extends PFuncSimpleAndList
}
private boolean checkBound(ExecutionContext execCxt, Node subject) {
+ AtomicBoolean cancel = execCxt.getCancelSignal();
try {
Graph graph = execCxt.getActiveGraph();
@@ -108,9 +110,9 @@ public abstract class GenericSpatialPropertyFunction
extends PFuncSimpleAndList
Iterator<Triple> spatialTriples;
if (AccessGeoSPARQL.containsGeoLiterals(graph)) {
- spatialTriples =
AccessGeoSPARQL.findSpecificGeoLiteralsByFeature(graph, subject);
+ spatialTriples =
AccessGeoSPARQL.findSpecificGeoLiteralsByFeature(cancel, graph, subject);
} else if (AccessWGS84.containsGeoLiteralProperties(graph)) {
- spatialTriples = AccessWGS84.findGeoLiteralsAsTriples(graph,
subject);
+ spatialTriples = AccessWGS84.findGeoLiteralsAsTriples(cancel,
graph, subject);
} else {
spatialTriples = Iter.empty();
}
diff --git
a/jena-arq/src/test/java/org/apache/jena/sparql/api/TestQueryExecutionCancel.java
b/jena-integration-tests/src/test/java/org/apache/jena/sparql/exec/AbstractTestQueryExecutionCancel.java
similarity index 58%
copy from
jena-arq/src/test/java/org/apache/jena/sparql/api/TestQueryExecutionCancel.java
copy to
jena-integration-tests/src/test/java/org/apache/jena/sparql/exec/AbstractTestQueryExecutionCancel.java
index 652465dcab..73d3b7e529 100644
---
a/jena-arq/src/test/java/org/apache/jena/sparql/api/TestQueryExecutionCancel.java
+++
b/jena-integration-tests/src/test/java/org/apache/jena/sparql/exec/AbstractTestQueryExecutionCancel.java
@@ -16,41 +16,48 @@
* limitations under the License.
*/
-package org.apache.jena.sparql.api;
+package org.apache.jena.sparql.exec;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
-import java.util.concurrent.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.IntStream;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.Timeout;
-
import org.apache.jena.graph.Graph;
import org.apache.jena.graph.NodeFactory;
-import org.apache.jena.query.*;
+import org.apache.jena.query.ARQ;
+import org.apache.jena.query.Dataset;
+import org.apache.jena.query.Query;
+import org.apache.jena.query.QueryCancelledException;
+import org.apache.jena.query.QueryExecution;
+import org.apache.jena.query.QueryExecutionFactory;
+import org.apache.jena.query.QueryFactory;
+import org.apache.jena.query.ReadWrite;
+import org.apache.jena.query.ResultSet;
+import org.apache.jena.query.ResultSetFormatter;
import org.apache.jena.rdf.model.Model;
-import org.apache.jena.rdf.model.ModelFactory;
import org.apache.jena.rdf.model.Property;
import org.apache.jena.rdf.model.Resource;
import org.apache.jena.sparql.ARQConstants;
import org.apache.jena.sparql.core.DatasetGraph;
import org.apache.jena.sparql.core.DatasetGraphFactory;
+import org.apache.jena.sparql.core.Transactional;
import org.apache.jena.sparql.engine.ExecutionContext;
import org.apache.jena.sparql.engine.iterator.QueryIteratorCheck;
import
org.apache.jena.sparql.engine.iterator.QueryIteratorCheck.OpenIteratorException;
-import org.apache.jena.sparql.exec.QueryExec;
-import org.apache.jena.sparql.exec.QueryExecBuilder;
import org.apache.jena.sparql.expr.NodeValue;
import org.apache.jena.sparql.function.FunctionBase0;
import org.apache.jena.sparql.function.FunctionEnv;
@@ -60,8 +67,15 @@ import org.apache.jena.sparql.graph.GraphFactory;
import org.apache.jena.sparql.sse.SSE;
import org.apache.jena.sparql.util.Context;
import org.apache.jena.sparql.util.Symbol;
+import org.apache.jena.system.AutoTxn;
+import org.apache.jena.system.Txn;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
-public class TestQueryExecutionCancel {
+public abstract class AbstractTestQueryExecutionCancel {
+
+ public abstract Dataset createDataset();
private static final String ns = "http://example/ns#";
@@ -79,70 +93,6 @@ public class TestQueryExecutionCancel {
@BeforeAll public static void beforeClass() {
FunctionRegistry.get().put(ns + "wait", wait.class); }
//@AfterAll public static void afterClass() {
FunctionRegistry.get().remove(ns + "wait"); }
- @Test
- public void test_Cancel_API_1()
- {
- try(QueryExecution qExec = makeQExec("SELECT * {?s ?p ?o}")) {
- ResultSet rs = qExec.execSelect();
- assertTrue(rs.hasNext());
- qExec.abort();
- assertThrows(QueryCancelledException.class,
- ()-> rs.nextSolution(),
- ()->"Results not expected
after cancel.");
- }
- }
-
- @Test
- public void test_Cancel_API_2()
- {
- try(QueryExecution qExec = makeQExec("PREFIX ex: <" + ns + "> SELECT *
{?s ?p ?o . FILTER ex:wait(100) }")) {
- ResultSet rs = qExec.execSelect();
- assertTrue(rs.hasNext());
- qExec.abort();
- assertThrows(QueryCancelledException.class,
- ()-> rs.hasNext(),
- ()->"Results not expected
after cancel.");
- }
- }
-
- @Test public void test_Cancel_API_3() throws InterruptedException
- {
- // Don't qExec.close on this thread.
- QueryExecution qExec = makeQExec("PREFIX ex: <" + ns + "> SELECT * {
?s ?p ?o . FILTER ex:wait(100) }");
- CancelThreadRunner thread = new CancelThreadRunner(qExec);
- thread.start();
- synchronized (qExec) { qExec.wait(); }
- synchronized (qExec) { qExec.abort();}
- synchronized (qExec) { qExec.notify(); }
- assertEquals (1, thread.getCount());
- }
-
- @Test public void test_Cancel_API_4() throws InterruptedException
- {
- // Don't qExec.close on this thread.
- QueryExecution qExec = makeQExec("PREFIX ex: <" + ns + "> SELECT * {
?s ?p ?o } ORDER BY ex:wait(100)");
- CancelThreadRunner thread = new CancelThreadRunner(qExec);
- thread.start();
- synchronized (qExec) { qExec.wait(); }
- synchronized (qExec) { qExec.abort(); }
- synchronized (qExec) { qExec.notify(); }
- assertEquals (1, thread.getCount());
- }
-
- @Test
- public void test_Cancel_API_5() {
- try (QueryExecution qe = QueryExecutionFactory.create("SELECT * { ?s
?p ?o }", m)) {
- qe.abort();
- assertThrows(QueryCancelledException.class, ()->
ResultSetFormatter.consume(qe.execSelect()));
- }
- }
-
- private QueryExecution makeQExec(String queryString)
- {
- Query q = QueryFactory.create(queryString);
- QueryExecution qExec = QueryExecutionFactory.create(q, m);
- return qExec;
- }
class CancelThreadRunner extends Thread
{
@@ -182,52 +132,52 @@ public class TestQueryExecutionCancel {
@Test
public void test_cancel_select_1() {
- cancellationTest("SELECT * {}", QueryExec::select);
+ cancellationTest("SELECT * {}", QueryExecution::execSelect);
}
@Test
public void test_cancel_select_2() {
- cancellationTest("SELECT * {}", QueryExec::select, Iterator::hasNext);
+ cancellationTest("SELECT * {}", QueryExecution::execSelect,
Iterator::hasNext);
}
@Test
public void test_cancel_ask() {
- cancellationTest("ASK {}", QueryExec::ask);
+ cancellationTest("ASK {}", QueryExecution::execAsk);
}
@Test
public void test_cancel_construct() {
- cancellationTest("CONSTRUCT WHERE {}", QueryExec::construct);
+ cancellationTest("CONSTRUCT WHERE {}", QueryExecution::execConstruct);
}
@Test
public void test_cancel_describe() {
- cancellationTest("DESCRIBE * {}", QueryExec::describe);
+ cancellationTest("DESCRIBE * {}", QueryExecution::execDescribe);
}
@Test
public void test_cancel_construct_dataset() {
- cancellationTest("CONSTRUCT{} WHERE{}", QueryExec::constructDataset);
+ cancellationTest("CONSTRUCT{} WHERE{}",
QueryExecution::execConstructDataset);
}
@Test
public void test_cancel_construct_triples_1() {
- cancellationTest("CONSTRUCT{} WHERE{}", QueryExec::constructTriples,
Iterator::hasNext);
+ cancellationTest("CONSTRUCT{} WHERE{}",
QueryExecution::execConstructTriples, Iterator::hasNext);
}
@Test
public void test_cancel_construct_triples_2() {
- cancellationTest("CONSTRUCT{} WHERE{}", QueryExec::constructTriples);
+ cancellationTest("CONSTRUCT{} WHERE{}",
QueryExecution::execConstructTriples);
}
@Test
public void test_cancel_construct_quads_1() {
- cancellationTest("CONSTRUCT{} WHERE{}", QueryExec::constructQuads,
Iterator::hasNext);
+ cancellationTest("CONSTRUCT{} WHERE{}",
QueryExecution::execConstructQuads, Iterator::hasNext);
}
@Test
public void test_cancel_construct_quads_2() {
- cancellationTest("CONSTRUCT{} WHERE{}", QueryExec::constructQuads);
+ cancellationTest("CONSTRUCT{} WHERE{}",
QueryExecution::execConstructQuads);
}
@Test
@@ -293,38 +243,43 @@ public class TestQueryExecutionCancel {
return fnReg;
}
- /** Create a model with 1000 triples. */
- static Graph createTestGraph() {
- Graph graph = GraphFactory.createDefaultGraph();
- IntStream.range(0, 1000)
+ static void generateTestData(Graph graph, int size) {
+ IntStream.range(0, size)
.mapToObj(i -> NodeFactory.createURI("http://www.example.org/r" +
i))
.forEach(node -> graph.add(node, node, node));
- return graph;
}
- static <T> void cancellationTest(String queryString, Function<QueryExec,
Iterator<T>> itFactory, Consumer<Iterator<T>> itConsumer) {
+ public <T> void cancellationTest(String queryString,
Function<QueryExecution, Iterator<T>> itFactory, Consumer<Iterator<T>>
itConsumer) {
cancellationTest(queryString, itFactory::apply);
cancellationTestForIterator(queryString, itFactory, itConsumer);
}
/** Abort the query exec and expect all execution methods to fail */
- static void cancellationTest(String queryString, Consumer<QueryExec>
execAction) {
- DatasetGraph dsg = DatasetGraphFactory.createTxnMem();
- dsg.add(SSE.parseQuad("(_ :s :p :o)"));
- try(QueryExec aExec =
QueryExec.dataset(dsg).query(queryString).build()) {
- aExec.abort();
- assertThrows(QueryCancelledException.class, ()->
execAction.accept(aExec));
+ public void cancellationTest(String queryString, Consumer<QueryExecution>
execAction) {
+ Dataset ds = createDataset();
+ try (AutoTxn txn = Txn.autoTxn(ds, ReadWrite.WRITE)) {
+ ds.asDatasetGraph().add(SSE.parseQuad("(_ :s :p :o)"));
+ txn.commit();
+ }
+ try (AutoTxn txn = Txn.autoTxn(ds, ReadWrite.READ);
+ QueryExecution exec =
QueryExecution.dataset(ds).query(queryString).build()) {
+ exec.abort();
+ assertThrows(QueryCancelledException.class, ()->
execAction.accept(exec));
}
}
/** Obtain an iterator and only afterwards abort the query exec.
* Operations on the iterator are now expected to fail. */
- static <T> void cancellationTestForIterator(String queryString,
Function<QueryExec, Iterator<T>> itFactory, Consumer<Iterator<T>> itConsumer) {
- DatasetGraph dsg = DatasetGraphFactory.createTxnMem();
- dsg.add(SSE.parseQuad("(_ :s :p :o)"));
- try(QueryExec aExec =
QueryExec.dataset(dsg).query(queryString).build()) {
- Iterator<T> it = itFactory.apply(aExec);
- aExec.abort();
+ public <T> void cancellationTestForIterator(String queryString,
Function<QueryExecution, Iterator<T>> itFactory, Consumer<Iterator<T>>
itConsumer) {
+ Dataset ds = createDataset();
+ try (AutoTxn txn = Txn.autoTxn(ds, ReadWrite.WRITE)) {
+ ds.asDatasetGraph().add(SSE.parseQuad("(_ :s :p :o)"));
+ txn.commit();
+ }
+ try (AutoTxn txn = Txn.autoTxn(ds, ReadWrite.READ);
+ QueryExecution exec =
QueryExecution.dataset(ds).query(queryString).build()) {
+ Iterator<T> it = itFactory.apply(exec);
+ exec.abort();
assertThrows(QueryCancelledException.class, ()->
itConsumer.accept(it));
}
}
@@ -335,7 +290,7 @@ public class TestQueryExecutionCancel {
@Timeout(value = 10000, unit=TimeUnit.MILLISECONDS)
public void test_cancel_concurrent_1() {
// Create a query that creates 3 cross joins - resulting in one
billion result rows.
- test_cancel_concurrent("SELECT * { ?a ?b ?c . ?d ?e ?f . ?g ?h ?i .
}");
+ test_cancel_concurrent(1000, "SELECT * { ?a ?b ?c . ?d ?e ?f . ?g ?h
?i . }");
}
@Test
@@ -343,10 +298,30 @@ public class TestQueryExecutionCancel {
public void test_cancel_concurrent_2() {
// Create a query that creates 3 cross joins - resulting in one
billion result rows.
// Tests against additional operators, namely UNION and BIND.
- test_cancel_concurrent("SELECT * { { ?a ?b ?c . ?d ?e ?f . ?g ?h ?i .
} UNION { BIND('x' AS ?x) } }");
+ test_cancel_concurrent(1000, "SELECT * { { ?a ?b ?c . ?d ?e ?f . ?g ?h
?i . } UNION { BIND('x' AS ?x) } }");
+ }
+
+ @Test
+ @Timeout(value = 10000, unit=TimeUnit.MILLISECONDS)
+ public void test_cancel_concurrent_3() {
+ test_cancel_concurrent(
+ 1000,
+ """
+ SELECT * {
+ ?s ?p ?o
+ {
+ SELECT * {
+ ?x ?y ?z
+ }
+ LIMIT 1000000
+ }
+ ?s ?p ?o
+ }
+ LIMIT 1000000
+ """);
}
- private static void test_cancel_concurrent(String queryString) {
+ private void test_cancel_concurrent(int testDataSize, String queryString) {
int maxCancelDelayInMillis = 100;
int cpuCount = Runtime.getRuntime().availableProcessors();
@@ -354,25 +329,26 @@ public class TestQueryExecutionCancel {
int taskCount = cpuCount * 10;
// Create a model with 1000 triples
- Model model = ModelFactory.createModelForGraph(createTestGraph());
+ Dataset ds = createDataset();
+ try (AutoTxn txn = Txn.autoTxn(ds, ReadWrite.WRITE)) {
+ generateTestData(ds.asDatasetGraph().getDefaultGraph(),
testDataSize);
+ ds.commit();
+ }
Query query = QueryFactory.create(queryString);
- Callable<QueryExecution> qeFactory = () ->
QueryExecutionFactory.create(query, model);
-
- runConcurrentAbort(taskCount, maxCancelDelayInMillis, qeFactory,
TestQueryExecutionCancel::doCount);
+ Callable<QueryExecution> qeFactory = () ->
QueryExecutionFactory.create(query, ds);
+ runConcurrentAbort(taskCount, maxCancelDelayInMillis, ds, qeFactory,
AbstractTestQueryExecutionCancel::doCount);
}
private static final int doCount(QueryExecution qe) {
- try (QueryExecution qe2 = qe) {
- ResultSet rs = qe2.execSelect();
- int size = ResultSetFormatter.consume(rs);
- return size;
- }
+ ResultSet rs = qe.execSelect();
+ int size = ResultSetFormatter.consume(rs);
+ return size;
}
/** Reusable method that creates a parallel stream that starts query
executions
* and schedules cancel tasks on a separate thread pool. */
- public static void runConcurrentAbort(int taskCount, int maxCancelDelay,
Callable<QueryExecution> qeFactory, Function<QueryExecution, ?> processor) {
+ public static void runConcurrentAbort(int taskCount, int maxCancelDelay,
Transactional ds, Callable<QueryExecution> qeFactory, Function<QueryExecution,
?> processor) {
Random cancelDelayRandom = new Random();
ExecutorService executorService = Executors.newCachedThreadPool();
try {
@@ -380,46 +356,58 @@ public class TestQueryExecutionCancel {
list
.parallelStream()
.forEach(i -> {
- QueryExecution qe;
- try {
- qe = qeFactory.call();
- } catch (Exception e) {
- throw new RuntimeException("Failed to build a query
execution", e);
- }
-
- // Fail if any iterators are not properly closed
- qe.getContext().set(QueryIteratorCheck.failOnOpenIterator,
true);
-
- Future<?> future = executorService.submit(() ->
processor.apply(qe));
- int delayToAbort =
cancelDelayRandom.nextInt(maxCancelDelay);
- try {
- Thread.sleep(delayToAbort);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- // System.out.println("Abort: " + qe);
- qe.abort();
- try {
- // System.out.println("Waiting for: " + qe);
- future.get();
- } catch (ExecutionException e) {
- Throwable cause = e.getCause();
- if (!(cause instanceof QueryCancelledException)) {
- // Unexpected exception - print out the stack trace
- e.printStackTrace();
+ try (AutoTxn txn = Txn.autoTxn(ds, ReadWrite.READ);
+ QueryExecution qe = qeFactory.call()) {
+ // Fail if any iterators are not properly closed
+
qe.getContext().set(QueryIteratorCheck.failOnOpenIterator, true);
+
+ // Schedule the concurrent abort action with random
delay.
+ int delayToAbort =
cancelDelayRandom.nextInt(maxCancelDelay);
+ boolean[] abortDone = {false};
+ Future<?> future = executorService.submit(() -> {
+ try {
+ Thread.sleep(delayToAbort);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ // System.out.println("Abort: " + qe);
+ abortDone[0] = true;
+ qe.abort();
+ });
+
+ // Meanwhile start the query execution.
+ try {
+ processor.apply(qe);
+ } catch (Throwable e) {
+ if (!(e instanceof QueryCancelledException)) {
+ // Unexpected exception - print out the stack
trace
+ e.printStackTrace();
+ }
+ assertEquals(QueryCancelledException.class,
e.getClass());
+
+ boolean hasOpenIterators =
Arrays.stream(e.getSuppressed())
+ .anyMatch(x -> x instanceof
OpenIteratorException);
+ if (hasOpenIterators) {
+ throw new RuntimeException("Encountered open
iterators.", e);
+ }
}
- assertEquals(QueryCancelledException.class,
cause.getClass());
- boolean hasOpenIterators =
Arrays.stream(cause.getSuppressed())
- .anyMatch(x -> x instanceof
OpenIteratorException);
- if (hasOpenIterators) {
- throw new RuntimeException("Encountered open
iterators.", e);
+ // The query has completed. Cancel the abort thread if
it hasn't done so yet.
+ if (!abortDone[0]) {
+ future.cancel(true);
+ }
+ try {
+ future.get();
+ } catch (CancellationException e) {
+ // In this test setup, it is an error for a query
to complete before abort gets called.
+ throw new RuntimeException("Query completed too
early", e);
+ } catch (Exception e) {
+ // Should not happen.
+ throw new RuntimeException(e);
}
- } catch (InterruptedException e) {
- // Ignored
- } finally {
- // System.out.println("Completed: " + qe);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
});
} finally {
diff --git
a/jena-arq/src/main/java/org/apache/jena/query/QueryCancelledException.java
b/jena-integration-tests/src/test/java/org/apache/jena/sparql/exec/TS_QueryExecutionCancel.java
similarity index 71%
copy from
jena-arq/src/main/java/org/apache/jena/query/QueryCancelledException.java
copy to
jena-integration-tests/src/test/java/org/apache/jena/sparql/exec/TS_QueryExecutionCancel.java
index e1361cb11c..faebe62ac7 100644
--- a/jena-arq/src/main/java/org/apache/jena/query/QueryCancelledException.java
+++
b/jena-integration-tests/src/test/java/org/apache/jena/sparql/exec/TS_QueryExecutionCancel.java
@@ -16,10 +16,16 @@
* limitations under the License.
*/
-package org.apache.jena.query;
+package org.apache.jena.sparql.exec;
-/** Indicate that a query execution has been cancelled and the operation can't
be called */
-public class QueryCancelledException extends QueryExecException
-{
- public QueryCancelledException() {}
+import org.junit.platform.suite.api.SelectClasses;
+import org.junit.platform.suite.api.Suite;
+
+@Suite
+@SelectClasses({
+ TestQueryExecutionCancel_ARQ.class,
+ TestQueryExecutionCancel_TDB2.class,
+ TestQueryExecutionCancel_TDB2_Text.class
+ })
+public class TS_QueryExecutionCancel {
}
diff --git
a/jena-arq/src/main/java/org/apache/jena/query/QueryCancelledException.java
b/jena-integration-tests/src/test/java/org/apache/jena/sparql/exec/TestQueryExecutionCancel_ARQ.java
similarity index 72%
copy from
jena-arq/src/main/java/org/apache/jena/query/QueryCancelledException.java
copy to
jena-integration-tests/src/test/java/org/apache/jena/sparql/exec/TestQueryExecutionCancel_ARQ.java
index e1361cb11c..08f5fa5506 100644
--- a/jena-arq/src/main/java/org/apache/jena/query/QueryCancelledException.java
+++
b/jena-integration-tests/src/test/java/org/apache/jena/sparql/exec/TestQueryExecutionCancel_ARQ.java
@@ -16,10 +16,16 @@
* limitations under the License.
*/
-package org.apache.jena.query;
+package org.apache.jena.sparql.exec;
-/** Indicate that a query execution has been cancelled and the operation can't
be called */
-public class QueryCancelledException extends QueryExecException
+import org.apache.jena.query.Dataset;
+import org.apache.jena.query.DatasetFactory;
+
+public class TestQueryExecutionCancel_ARQ
+ extends AbstractTestQueryExecutionCancel
{
- public QueryCancelledException() {}
+ @Override
+ public Dataset createDataset() {
+ return DatasetFactory.create();
+ }
}
diff --git
a/jena-arq/src/main/java/org/apache/jena/query/QueryCancelledException.java
b/jena-integration-tests/src/test/java/org/apache/jena/sparql/exec/TestQueryExecutionCancel_TDB2.java
similarity index 72%
copy from
jena-arq/src/main/java/org/apache/jena/query/QueryCancelledException.java
copy to
jena-integration-tests/src/test/java/org/apache/jena/sparql/exec/TestQueryExecutionCancel_TDB2.java
index e1361cb11c..856bb253d7 100644
--- a/jena-arq/src/main/java/org/apache/jena/query/QueryCancelledException.java
+++
b/jena-integration-tests/src/test/java/org/apache/jena/sparql/exec/TestQueryExecutionCancel_TDB2.java
@@ -16,10 +16,16 @@
* limitations under the License.
*/
-package org.apache.jena.query;
+package org.apache.jena.sparql.exec;
-/** Indicate that a query execution has been cancelled and the operation can't
be called */
-public class QueryCancelledException extends QueryExecException
+import org.apache.jena.query.Dataset;
+import org.apache.jena.tdb2.TDB2Factory;
+
+public class TestQueryExecutionCancel_TDB2
+ extends AbstractTestQueryExecutionCancel
{
- public QueryCancelledException() {}
+ @Override
+ public Dataset createDataset() {
+ return TDB2Factory.createDataset();
+ }
}
diff --git
a/jena-integration-tests/src/test/java/org/apache/jena/sparql/exec/TestQueryExecutionCancel_TDB2_Text.java
b/jena-integration-tests/src/test/java/org/apache/jena/sparql/exec/TestQueryExecutionCancel_TDB2_Text.java
new file mode 100644
index 0000000000..e188ecdc18
--- /dev/null
+++
b/jena-integration-tests/src/test/java/org/apache/jena/sparql/exec/TestQueryExecutionCancel_TDB2_Text.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.sparql.exec;
+
+import org.apache.jena.query.Dataset;
+import org.apache.jena.query.DatasetFactory;
+import org.apache.jena.rdf.model.Model;
+import org.apache.jena.riot.Lang;
+import org.apache.jena.riot.RDFParser;
+
+public class TestQueryExecutionCancel_TDB2_Text
+ extends AbstractTestQueryExecutionCancel
+{
+ @Override
+ public Dataset createDataset() {
+ String spec = """
+ prefix : <http://www.example.org/>
+ prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
+ prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#>
+ prefix ja: <http://jena.hpl.hp.com/2005/11/Assembler#>
+ prefix tdb2: <http://jena.apache.org/2016/tdb#>
+ prefix text: <http://jena.apache.org/text#>
+
+ :text_dataset rdf:type text:TextDataset ;
+ text:dataset :dataset_tdb2 ;
+ text:index :indexLucene ;
+ .
+
+ :indexLucene a text:TextIndexLucene ;
+ text:directory "mem" ;
+ text:entityMap :entMap ;
+ .
+
+ :entMap a text:EntityMap ;
+ text:entityField "uri" ;
+ text:defaultField "rdfs-label" ;
+ text:uidField "uid" ;
+ text:map (
+ [ text:field "rdfs-label" ; text:predicate rdfs:label ]
+ )
+ .
+
+ :dataset_tdb2 rdf:type tdb2:DatasetTDB2 ;
+ tdb2:location "--mem--" ;
+ ja:context [ ja:cxtName "arq:queryTimeout" ; ja:cxtValue
"1000" ] ;
+ .
+ """;
+ Model specModel = RDFParser.fromString(spec, Lang.TURTLE).toModel();
+
+ Dataset ds = DatasetFactory.assemble(specModel,
"http://www.example.org/text_dataset");
+ return ds;
+ }
+}
diff --git
a/jena-tdb1/src/main/java/org/apache/jena/tdb1/solver/OpExecutorTDB1.java
b/jena-tdb1/src/main/java/org/apache/jena/tdb1/solver/OpExecutorTDB1.java
index b41409d9f4..5b1fb5f829 100644
--- a/jena-tdb1/src/main/java/org/apache/jena/tdb1/solver/OpExecutorTDB1.java
+++ b/jena-tdb1/src/main/java/org/apache/jena/tdb1/solver/OpExecutorTDB1.java
@@ -34,6 +34,7 @@ import org.apache.jena.sparql.core.Substitute;
import org.apache.jena.sparql.core.Var;
import org.apache.jena.sparql.engine.ExecutionContext;
import org.apache.jena.sparql.engine.QueryIterator;
+import org.apache.jena.sparql.engine.iterator.QueryIterFailed;
import org.apache.jena.sparql.engine.iterator.QueryIterPeek;
import org.apache.jena.sparql.engine.main.OpExecutor;
import org.apache.jena.sparql.engine.main.OpExecutorFactory;
@@ -188,7 +189,11 @@ public class OpExecutorTDB1 extends OpExecutor
{
QueryIterPeek peek = QueryIterPeek.create(input, execCxt);
input = peek; // Must pass on
- pattern = reorder(pattern, peek, transform);
+ try {
+ pattern = reorder(pattern, peek, transform);
+ } catch (Exception e) {
+ return new QueryIterFailed(input, execCxt, e);
+ }
}
}
if ( exprs == null ) {
@@ -208,9 +213,6 @@ public class OpExecutorTDB1 extends OpExecutor
Node gn, BasicPattern
bgp,
ExprList exprs,
ExecutionContext execCxt)
{
- if ( ! input.hasNext() )
- return input;
-
// ---- Graph names with special meaning.
gn = decideGraphNode(gn, execCxt);
@@ -226,7 +228,18 @@ public class OpExecutorTDB1 extends OpExecutor
{
QueryIterPeek peek = QueryIterPeek.create(input, execCxt);
input = peek; // Original input now invalid.
- bgp = reorder(bgp, peek, transform);
+ try {
+ bgp = reorder(bgp, peek, transform);
+ } catch (Exception e) {
+ return new QueryIterFailed(input, execCxt, e);
+ }
+ }
+ } else {
+ try {
+ if ( ! input.hasNext() )
+ return input;
+ } catch (Exception e) {
+ return new QueryIterFailed(input, execCxt, e);
}
}
diff --git
a/jena-tdb1/src/main/java/org/apache/jena/tdb1/solver/PatternMatchTDB1.java
b/jena-tdb1/src/main/java/org/apache/jena/tdb1/solver/PatternMatchTDB1.java
index 5d1bc388a4..d9597d729f 100644
--- a/jena-tdb1/src/main/java/org/apache/jena/tdb1/solver/PatternMatchTDB1.java
+++ b/jena-tdb1/src/main/java/org/apache/jena/tdb1/solver/PatternMatchTDB1.java
@@ -117,7 +117,7 @@ public class PatternMatchTDB1 {
// RDF-star SA
chain = matchQuadPattern(chain, graphNode, triple, nodeTupleTable,
patternTuple, anyGraph, filter, execCxt);
- chain = makeAbortable(chain, killList);
+ chain = makeAbortable(chain, killList, execCxt.getCancelSignal());
}
Iterator<Binding> iterBinding = SolverLibTDB.convertToNodes(chain,
nodeTable);
diff --git
a/jena-tdb1/src/main/java/org/apache/jena/tdb1/solver/SolverLibTDB.java
b/jena-tdb1/src/main/java/org/apache/jena/tdb1/solver/SolverLibTDB.java
index b68ac343b9..975b049daf 100644
--- a/jena-tdb1/src/main/java/org/apache/jena/tdb1/solver/SolverLibTDB.java
+++ b/jena-tdb1/src/main/java/org/apache/jena/tdb1/solver/SolverLibTDB.java
@@ -148,16 +148,13 @@ public class SolverLibTDB
List<Abortable> killList = new ArrayList<>();
Iterator<Tuple<NodeId>> iter1 =
ds.getQuadTable().getNodeTupleTable().find(NodeId.NodeIdAny, NodeId.NodeIdAny,
NodeId.NodeIdAny, NodeId.NodeIdAny);
+ iter1 = makeAbortable(iter1, killList, execCxt.getCancelSignal());
+
if ( filter != null )
iter1 = Iter.filter(iter1, filter);
Iterator<NodeId> iter2 = Iter.map(iter1, t -> t.get(0));
- // Project is cheap - don't brother wrapping iter1
- iter2 = makeAbortable(iter2, killList);
-
Iterator<NodeId> iter3 = Iter.distinct(iter2);
- iter3 = makeAbortable(iter3, killList);
-
Iterator<Node> iter4 =
NodeLib.nodes(ds.getQuadTable().getNodeTupleTable().getNodeTable(), iter3);
final Var var = Var.alloc(graphNode);
diff --git a/jena-tdb1/src/main/java/org/apache/jena/tdb1/solver/SolverRX.java
b/jena-tdb1/src/main/java/org/apache/jena/tdb1/solver/SolverRX.java
index d646722ee0..e956878200 100644
--- a/jena-tdb1/src/main/java/org/apache/jena/tdb1/solver/SolverRX.java
+++ b/jena-tdb1/src/main/java/org/apache/jena/tdb1/solver/SolverRX.java
@@ -23,6 +23,7 @@ import static
org.apache.jena.sparql.engine.main.solver.SolverLib.tripleHasEmbTr
import static org.apache.jena.tdb1.solver.SolverLibTDB.convFromBinding;
import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Predicate;
@@ -37,6 +38,7 @@ import org.apache.jena.sparql.core.Substitute;
import org.apache.jena.sparql.engine.ExecutionContext;
import org.apache.jena.sparql.engine.binding.Binding;
import org.apache.jena.sparql.engine.binding.BindingFactory;
+import org.apache.jena.sparql.engine.iterator.IterAbortable;
import org.apache.jena.sparql.engine.main.solver.SolverRX4;
import org.apache.jena.tdb1.lib.TupleLib;
import org.apache.jena.tdb1.store.NodeId;
@@ -113,6 +115,11 @@ public class SolverRX {
// -- DRY/StageMatchTuple ??
Iterator<Tuple<NodeId>> iterMatches =
nodeTupleTable.find(patternTupleId);
+
+ // Add cancel
+ AtomicBoolean cancelSignal = execCxt.getCancelSignal();
+ iterMatches = IterAbortable.wrap(iterMatches, cancelSignal);
+
// Add filter
if ( filter != null )
iterMatches = Iter.filter(iterMatches, filter);
diff --git
a/jena-tdb1/src/main/java/org/apache/jena/tdb1/solver/StageMatchTuple.java
b/jena-tdb1/src/main/java/org/apache/jena/tdb1/solver/StageMatchTuple.java
index 2ab35b13f5..a1c41566d3 100644
--- a/jena-tdb1/src/main/java/org/apache/jena/tdb1/solver/StageMatchTuple.java
+++ b/jena-tdb1/src/main/java/org/apache/jena/tdb1/solver/StageMatchTuple.java
@@ -29,9 +29,9 @@ import org.apache.jena.atlas.lib.StrUtils;
import org.apache.jena.atlas.lib.tuple.Tuple;
import org.apache.jena.atlas.lib.tuple.TupleFactory;
import org.apache.jena.graph.Node;
-import org.apache.jena.query.QueryCancelledException;
import org.apache.jena.sparql.core.Var;
import org.apache.jena.sparql.engine.ExecutionContext;
+import org.apache.jena.sparql.engine.iterator.IterAbortable;
import org.apache.jena.tdb1.store.NodeId;
import org.apache.jena.tdb1.store.nodetable.NodeTable;
import org.apache.jena.tdb1.store.nodetupletable.NodeTupleTable;
@@ -72,14 +72,7 @@ class StageMatchTuple {
// Add cancel check.
AtomicBoolean cancelSignal = execCxt.getCancelSignal();
- if (cancelSignal != null) {
- iterMatches = Iter.map(iterMatches, x -> {
- if (cancelSignal.get()) {
- throw new QueryCancelledException();
- }
- return x;
- });
- }
+ iterMatches = IterAbortable.wrap(iterMatches, cancelSignal);
// ** Allow a triple or quad filter here.
if ( filter != null )
diff --git
a/jena-tdb2/src/main/java/org/apache/jena/tdb2/solver/OpExecutorTDB2.java
b/jena-tdb2/src/main/java/org/apache/jena/tdb2/solver/OpExecutorTDB2.java
index 06f091bf07..6cce197e4d 100644
--- a/jena-tdb2/src/main/java/org/apache/jena/tdb2/solver/OpExecutorTDB2.java
+++ b/jena-tdb2/src/main/java/org/apache/jena/tdb2/solver/OpExecutorTDB2.java
@@ -34,6 +34,7 @@ import org.apache.jena.sparql.core.Substitute;
import org.apache.jena.sparql.core.Var;
import org.apache.jena.sparql.engine.ExecutionContext;
import org.apache.jena.sparql.engine.QueryIterator;
+import org.apache.jena.sparql.engine.iterator.QueryIterFailed;
import org.apache.jena.sparql.engine.iterator.QueryIterPeek;
import org.apache.jena.sparql.engine.main.OpExecutor;
import org.apache.jena.sparql.engine.main.OpExecutorFactory;
@@ -191,9 +192,6 @@ public class OpExecutorTDB2 extends OpExecutor
BasicPattern pattern,
ExprList exprs,
ExecutionContext
execCxt)
{
- if ( !input.hasNext() )
- return input;
-
// -- Input
// Must pass this iterator into the next stage.
if ( pattern.size() >= 2 ) {
@@ -202,7 +200,18 @@ public class OpExecutorTDB2 extends OpExecutor
if ( transform != null ) {
QueryIterPeek peek = QueryIterPeek.create(input, execCxt);
input = peek; // Must pass on
- pattern = reorder(pattern, peek, transform);
+ try {
+ pattern = reorder(pattern, peek, transform);
+ } catch (Exception e) {
+ return new QueryIterFailed(input, execCxt, e);
+ }
+ }
+ } else {
+ try {
+ if ( !input.hasNext() )
+ return input;
+ } catch (Exception e) {
+ return new QueryIterFailed(input, execCxt, e);
}
}
@@ -223,9 +232,6 @@ public class OpExecutorTDB2 extends OpExecutor
Node gn, BasicPattern
bgp,
ExprList exprs,
ExecutionContext execCxt)
{
- if ( !input.hasNext() )
- return input;
-
// ---- Graph names with special meaning.
gn = decideGraphNode(gn, execCxt);
@@ -239,7 +245,18 @@ public class OpExecutorTDB2 extends OpExecutor
if ( transform != null ) {
QueryIterPeek peek = QueryIterPeek.create(input, execCxt);
input = peek; // Original input now invalid.
- bgp = reorder(bgp, peek, transform);
+ try {
+ bgp = reorder(bgp, peek, transform);
+ } catch (Exception e) {
+ return new QueryIterFailed(input, execCxt, e);
+ }
+ }
+ } else {
+ try {
+ if ( !input.hasNext() )
+ return input;
+ } catch (Exception e) {
+ return new QueryIterFailed(input, execCxt, e);
}
}
diff --git
a/jena-tdb2/src/main/java/org/apache/jena/tdb2/solver/PatternMatchTDB2.java
b/jena-tdb2/src/main/java/org/apache/jena/tdb2/solver/PatternMatchTDB2.java
index a58524b068..05e3a3d7d4 100644
--- a/jena-tdb2/src/main/java/org/apache/jena/tdb2/solver/PatternMatchTDB2.java
+++ b/jena-tdb2/src/main/java/org/apache/jena/tdb2/solver/PatternMatchTDB2.java
@@ -117,7 +117,7 @@ public class PatternMatchTDB2 {
// RDF-star SA
chain = matchQuadPattern(chain, graphNode, triple, nodeTupleTable,
patternTuple, anyGraph, filter, execCxt);
- chain = makeAbortable(chain, killList);
+ chain = makeAbortable(chain, killList, execCxt.getCancelSignal());
}
Iterator<Binding> iterBinding = SolverLibTDB.convertToNodes(chain,
nodeTable);
diff --git
a/jena-tdb2/src/main/java/org/apache/jena/tdb2/solver/SolverLibTDB.java
b/jena-tdb2/src/main/java/org/apache/jena/tdb2/solver/SolverLibTDB.java
index 1bcad08586..b37cebe6de 100644
--- a/jena-tdb2/src/main/java/org/apache/jena/tdb2/solver/SolverLibTDB.java
+++ b/jena-tdb2/src/main/java/org/apache/jena/tdb2/solver/SolverLibTDB.java
@@ -193,12 +193,12 @@ public class SolverLibTDB
distinctMode = GraphNamesDistinctMode.FULL;
}
+ iter1 = makeAbortable(iter1, killList, execCxt.getCancelSignal());
+
if ( filter != null )
iter1 = Iter.filter(iter1, filter);
Iterator<NodeId> iter2 = Iter.map(iter1, t -> t.get(0));
- // Project is cheap - don't brother wrapping iter1
- iter2 = makeAbortable(iter2, killList);
// Apply the necessary distinct calculation (if any)
Iterator<NodeId> iter3;
@@ -214,7 +214,6 @@ public class SolverLibTDB
iter3 = iter2;
break;
}
- iter3 = makeAbortable(iter3, killList);
Iterator<Node> iter4 =
NodeLib.nodes(ds.getQuadTable().getNodeTupleTable().getNodeTable(), iter3);
diff --git a/jena-tdb2/src/main/java/org/apache/jena/tdb2/solver/SolverRX.java
b/jena-tdb2/src/main/java/org/apache/jena/tdb2/solver/SolverRX.java
index 8f7b4636b8..8464789fa5 100644
--- a/jena-tdb2/src/main/java/org/apache/jena/tdb2/solver/SolverRX.java
+++ b/jena-tdb2/src/main/java/org/apache/jena/tdb2/solver/SolverRX.java
@@ -23,6 +23,7 @@ import static
org.apache.jena.sparql.engine.main.solver.SolverLib.tripleHasEmbTr
import static org.apache.jena.tdb2.solver.SolverLibTDB.convFromBinding;
import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Predicate;
@@ -37,6 +38,7 @@ import org.apache.jena.sparql.core.Substitute;
import org.apache.jena.sparql.engine.ExecutionContext;
import org.apache.jena.sparql.engine.binding.Binding;
import org.apache.jena.sparql.engine.binding.BindingFactory;
+import org.apache.jena.sparql.engine.iterator.IterAbortable;
import org.apache.jena.sparql.engine.main.solver.SolverRX4;
import org.apache.jena.tdb2.lib.TupleLib;
import org.apache.jena.tdb2.store.NodeId;
@@ -112,6 +114,11 @@ public class SolverRX {
return Iter.nullIterator();
// -- DRY/StageMatchTuple
Iterator<Tuple<NodeId>> iterMatches =
nodeTupleTable.find(patternTupleId);
+
+ // Add cancel
+ AtomicBoolean cancelSignal = execCxt.getCancelSignal();
+ iterMatches = IterAbortable.wrap(iterMatches, cancelSignal);
+
// Add filter
if ( filter != null )
iterMatches = Iter.filter(iterMatches, filter);
diff --git
a/jena-tdb2/src/main/java/org/apache/jena/tdb2/solver/StageMatchTuple.java
b/jena-tdb2/src/main/java/org/apache/jena/tdb2/solver/StageMatchTuple.java
index 3318b18390..9f199d2c80 100644
--- a/jena-tdb2/src/main/java/org/apache/jena/tdb2/solver/StageMatchTuple.java
+++ b/jena-tdb2/src/main/java/org/apache/jena/tdb2/solver/StageMatchTuple.java
@@ -29,9 +29,9 @@ import org.apache.jena.atlas.lib.StrUtils;
import org.apache.jena.atlas.lib.tuple.Tuple;
import org.apache.jena.atlas.lib.tuple.TupleFactory;
import org.apache.jena.graph.Node;
-import org.apache.jena.query.QueryCancelledException;
import org.apache.jena.sparql.core.Var;
import org.apache.jena.sparql.engine.ExecutionContext;
+import org.apache.jena.sparql.engine.iterator.IterAbortable;
import org.apache.jena.tdb2.store.NodeId;
import org.apache.jena.tdb2.store.nodetable.NodeTable;
import org.apache.jena.tdb2.store.nodetupletable.NodeTupleTable;
@@ -72,14 +72,7 @@ class StageMatchTuple {
// Add cancel check.
AtomicBoolean cancelSignal = execCxt.getCancelSignal();
- if (cancelSignal != null) {
- iterMatches = Iter.map(iterMatches, x -> {
- if (cancelSignal.get()) {
- throw new QueryCancelledException();
- }
- return x;
- });
- }
+ iterMatches = IterAbortable.wrap(iterMatches, cancelSignal);
// ** Allow a triple or quad filter here.
if ( filter != null )
diff --git
a/jena-text/src/main/java/org/apache/jena/query/text/DatasetGraphText.java
b/jena-text/src/main/java/org/apache/jena/query/text/DatasetGraphText.java
index 1b5e42a9ce..992999b77e 100644
--- a/jena-text/src/main/java/org/apache/jena/query/text/DatasetGraphText.java
+++ b/jena-text/src/main/java/org/apache/jena/query/text/DatasetGraphText.java
@@ -186,9 +186,11 @@ public class DatasetGraphText extends
DatasetGraphTextMonitor implements Transac
*/
@Override
public void abort() {
- super.getMonitor().finish();
- abortAction.run();
- readWriteMode.set(null);
+ synchronized (txnExitLock) {
+ super.getMonitor().finish();
+ abortAction.run();
+ readWriteMode.set(null);
+ }
}
private void commit_R() {