This is an automated email from the ASF dual-hosted git repository.
andy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/jena.git
The following commit(s) were added to refs/heads/main by this push:
new 6313965ac0 GH-2821: Support for abort and timeouts on updates.
6313965ac0 is described below
commit 6313965ac01658dc211b4234b47860ab9ce51274
Author: Claus Stadler <[email protected]>
AuthorDate: Tue Nov 5 18:02:59 2024 +0100
GH-2821: Support for abort and timeouts on updates.
---
.../main/java/org/apache/jena/http/HttpLib.java | 14 ++
.../org/apache/jena/http/sys/ExecHTTPBuilder.java | 11 --
.../jena/http/sys/ExecUpdateHTTPBuilder.java | 34 ++--
.../src/main/java/org/apache/jena/query/ARQ.java | 11 ++
.../jena/sparql/engine/ExecutionContext.java | 15 +-
.../org/apache/jena/sparql/engine/Timeouts.java | 195 +++++++++++++++++++++
.../engine/iterator/QueryIterProcessBinding.java | 3 +-
.../sparql/engine/iterator/QueryIteratorBase.java | 2 +-
.../apache/jena/sparql/exec/QueryExecDataset.java | 35 ++--
.../jena/sparql/exec/QueryExecDatasetBuilder.java | 62 ++-----
.../apache/jena/sparql/exec/UpdateExecAdapter.java | 11 ++
.../apache/jena/sparql/exec/UpdateExecBuilder.java | 4 +
.../jena/sparql/exec/UpdateExecBuilderAdapter.java | 7 +
.../apache/jena/sparql/exec/UpdateExecDataset.java | 5 +-
.../jena/sparql/exec/UpdateExecDatasetBuilder.java | 51 +++---
.../jena/sparql/exec/UpdateExecutionAdapter.java | 10 ++
.../sparql/exec/UpdateExecutionBuilderAdapter.java | 8 +
.../jena/sparql/exec/http/QueryExecHTTP.java | 2 +-
.../jena/sparql/exec/http/UpdateExecHTTP.java | 57 ++++--
.../sparql/exec/http/UpdateExecHTTPBuilder.java | 2 +-
.../exec/http/UpdateExecutionHTTPBuilder.java | 3 +-
.../apache/jena/sparql/modify/UpdateEngine.java | 6 +-
.../jena/sparql/modify/UpdateEngineBase.java | 21 +--
.../jena/sparql/modify/UpdateEngineMain.java | 20 +--
.../jena/sparql/modify/UpdateEngineWorker.java | 66 ++++++-
.../jena/sparql/modify/UpdateProcessorBase.java | 38 +++-
.../java/org/apache/jena/sparql/util/Context.java | 22 +++
.../apache/jena/update/UpdateExecutionBuilder.java | 6 +
.../jena/update/UpdateExecutionDatasetBuilder.java | 8 +
.../org/apache/jena/update/UpdateProcessor.java | 8 +
.../jena/sparql/api/TestQueryExecutionCancel.java | 96 ++++++++--
.../org/apache/jena/sparql/api/TestTimeouts.java | 61 +++++++
.../jena/sparql/api/TestUpdateExecutionCancel.java | 176 +++++++++++++++++++
.../fuseki/main/TestSPARQLProtocolTimeout.java | 64 +++++++
.../rdfconnection/TestRDFConnectionRemote.java | 24 ++-
35 files changed, 963 insertions(+), 195 deletions(-)
diff --git a/jena-arq/src/main/java/org/apache/jena/http/HttpLib.java
b/jena-arq/src/main/java/org/apache/jena/http/HttpLib.java
index d141eada0c..530533b845 100644
--- a/jena-arq/src/main/java/org/apache/jena/http/HttpLib.java
+++ b/jena-arq/src/main/java/org/apache/jena/http/HttpLib.java
@@ -244,7 +244,21 @@ public class HttpLib {
* @return String
*/
public static String handleResponseRtnString(HttpResponse<InputStream>
response) {
+ return handleResponseRtnString(response, null);
+ }
+
+ /**
+ * Handle the HTTP response and read the body to produce a string if a 200.
+ * Otherwise, throw an {@link HttpException}.
+ * @param response
+ * @param callback A callback that receives the opened input stream.
+ * @return String
+ */
+ public static String handleResponseRtnString(HttpResponse<InputStream>
response, Consumer<InputStream> callback) {
InputStream input = handleResponseInputStream(response);
+ if (callback != null) {
+ callback.accept(input);
+ }
try {
return IO.readWholeFileAsUTF8(input);
} catch (RuntimeIOException e) { throw new HttpException(e); }
diff --git
a/jena-arq/src/main/java/org/apache/jena/http/sys/ExecHTTPBuilder.java
b/jena-arq/src/main/java/org/apache/jena/http/sys/ExecHTTPBuilder.java
index c5b6718d27..9c3b032d44 100644
--- a/jena-arq/src/main/java/org/apache/jena/http/sys/ExecHTTPBuilder.java
+++ b/jena-arq/src/main/java/org/apache/jena/http/sys/ExecHTTPBuilder.java
@@ -280,31 +280,20 @@ public abstract class ExecHTTPBuilder<X, Y> {
public Y context(Context context) {
if ( context == null )
return thisBuilder();
- ensureContext();
contextAcc.context(context);
- //this.context.putAll(context);
return thisBuilder();
}
public Y set(Symbol symbol, Object value) {
- ensureContext();
contextAcc.set(symbol, value);
- //context.set(symbol, value);
return thisBuilder();
}
public Y set(Symbol symbol, boolean value) {
- ensureContext();
contextAcc.set(symbol, value);
- //context.set(symbol, value);
return thisBuilder();
}
- private void ensureContext() {
-// if ( context == null )
-// context = new Context();
- }
-
/**
* Set a timeout of the overall operation.
* Time-to-connect can be set with a custom {@link HttpClient} - see
{@link java.net.http.HttpClient.Builder#connectTimeout(java.time.Duration)}.
diff --git
a/jena-arq/src/main/java/org/apache/jena/http/sys/ExecUpdateHTTPBuilder.java
b/jena-arq/src/main/java/org/apache/jena/http/sys/ExecUpdateHTTPBuilder.java
index 19bbe0f801..22e551c4ce 100644
--- a/jena-arq/src/main/java/org/apache/jena/http/sys/ExecUpdateHTTPBuilder.java
+++ b/jena-arq/src/main/java/org/apache/jena/http/sys/ExecUpdateHTTPBuilder.java
@@ -20,6 +20,7 @@ package org.apache.jena.http.sys;
import java.net.http.HttpClient;
import java.util.*;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.jena.graph.Node;
@@ -31,6 +32,7 @@ import org.apache.jena.sparql.exec.http.Params;
import org.apache.jena.sparql.exec.http.UpdateSendMode;
import org.apache.jena.sparql.syntax.syntaxtransform.UpdateTransformOps;
import org.apache.jena.sparql.util.Context;
+import org.apache.jena.sparql.util.ContextAccumulator;
import org.apache.jena.sparql.util.Symbol;
import org.apache.jena.sys.JenaSystem;
import org.apache.jena.update.Update;
@@ -56,8 +58,9 @@ public abstract class ExecUpdateHTTPBuilder<X, Y> {
/** Accumulator for update elements. Can build an overall string or
UpdateRequest from the elements. */
private class UpdateEltAcc implements Iterable<UpdateElt> {
- /** Delimiter for joining multiple SPARQL update strings into a single
one. */
- public static final String DELIMITER = ";\n";
+ /** Delimiter for joining multiple SPARQL update strings into a single
one.
+ * The delimiter takes into account that the last line of a statement
may be a single-line-comment. */
+ public static final String DELIMITER = "\n;\n";
private List<UpdateElt> updateOperations = new ArrayList<>();
private List<UpdateElt> updateOperationsView =
Collections.unmodifiableList(updateOperations);
@@ -76,6 +79,7 @@ public abstract class ExecUpdateHTTPBuilder<X, Y> {
add(new UpdateElt(update));
}
+ /** Add a string by parsing it. */
public void add(String updateRequestString) {
UpdateRequest updateRequest =
UpdateFactory.create(updateRequestString);
add(updateRequest);
@@ -146,9 +150,11 @@ public abstract class ExecUpdateHTTPBuilder<X, Y> {
protected UpdateSendMode sendMode = UpdateSendMode.systemDefault;
protected List<String> usingGraphURIs = null;
protected List<String> usingNamedGraphURIs = null;
- protected Context context = null;
+ private ContextAccumulator contextAcc =
ContextAccumulator.newBuilder(()->ARQ.getContext());
// Uses query rewrite to replace variables by values.
protected Map<Var, Node> substitutionMap = new HashMap<>();
+ protected long timeout = -1;
+ protected TimeUnit timeoutUnit = null;
protected ExecUpdateHTTPBuilder() {}
@@ -213,6 +219,12 @@ public abstract class ExecUpdateHTTPBuilder<X, Y> {
return thisBuilder();
}
+ public Y timeout(long timeout, TimeUnit timeoutUnit) {
+ this.timeout = timeout;
+ this.timeoutUnit = timeoutUnit;
+ return thisBuilder();
+ }
+
public Y httpClient(HttpClient httpClient) {
this.httpClient = Objects.requireNonNull(httpClient);
return thisBuilder();
@@ -274,28 +286,20 @@ public abstract class ExecUpdateHTTPBuilder<X, Y> {
public Y context(Context context) {
if ( context == null )
return thisBuilder();
- ensureContext();
- this.context.setAll(context);
+ this.contextAcc.context(context);
return thisBuilder();
}
public Y set(Symbol symbol, Object value) {
- ensureContext();
- this.context.set(symbol, value);
+ this.contextAcc.set(symbol, value);
return thisBuilder();
}
public Y set(Symbol symbol, boolean value) {
- ensureContext();
- this.context.set(symbol, value);
+ this.contextAcc.set(symbol, value);
return thisBuilder();
}
- private void ensureContext() {
- if ( context == null )
- context = Context.create();
- }
-
public X build() {
Objects.requireNonNull(serviceURL, "No service URL");
if ( updateEltAcc.isEmpty() )
@@ -322,7 +326,7 @@ public abstract class ExecUpdateHTTPBuilder<X, Y> {
// If the UpdateRequest object wasn't built until now then build the
string instead.
String updateStringActual = updateActual == null ?
updateEltAcc.buildString() : null;
- Context cxt = (context!=null) ? context : ARQ.getContext().copy();
+ Context cxt = contextAcc.context();
return buildX(hClient, updateActual, updateStringActual, cxt);
}
diff --git a/jena-arq/src/main/java/org/apache/jena/query/ARQ.java
b/jena-arq/src/main/java/org/apache/jena/query/ARQ.java
index 74d0a9361a..2d6649babf 100644
--- a/jena-arq/src/main/java/org/apache/jena/query/ARQ.java
+++ b/jena-arq/src/main/java/org/apache/jena/query/ARQ.java
@@ -198,6 +198,17 @@ public class ARQ
*/
public static final Symbol queryTimeout =
SystemARQ.allocSymbol("queryTimeout");
+ /**
+ * Set timeout. The value of this symbol gives the value of the timeout
in milliseconds
+ * <ul>
+ * <li>A Number; the long value is used</li>
+ * <li>A string, e.g. "1000", parsed as a number</li>
+ * <li>A string, as two numbers separated by a comma, e.g. "500,10000"
parsed as two numbers</li>
+ * </ul>
+ * @see org.apache.jena.update.UpdateExecutionBuilder#timeout(long,
TimeUnit)
+ */
+ public static final Symbol updateTimeout =
SystemARQ.allocSymbol("updateTimeout");
+
// This can't be a context constant because NodeValues don't look in the
context.
// /**
// * Context symbol controlling Roman Numerals in Filters.
diff --git
a/jena-arq/src/main/java/org/apache/jena/sparql/engine/ExecutionContext.java
b/jena-arq/src/main/java/org/apache/jena/sparql/engine/ExecutionContext.java
index 8d67401a3c..a79eea860b 100644
--- a/jena-arq/src/main/java/org/apache/jena/sparql/engine/ExecutionContext.java
+++ b/jena-arq/src/main/java/org/apache/jena/sparql/engine/ExecutionContext.java
@@ -24,10 +24,8 @@ import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.jena.atlas.iterator.Iter;
-import org.apache.jena.atlas.logging.Log;
import org.apache.jena.graph.Graph;
import org.apache.jena.query.ARQ;
-import org.apache.jena.sparql.ARQConstants;
import org.apache.jena.sparql.core.DatasetGraph;
import org.apache.jena.sparql.engine.main.OpExecutor;
import org.apache.jena.sparql.engine.main.OpExecutorFactory;
@@ -78,18 +76,7 @@ public class ExecutionContext implements FunctionEnv
}
public ExecutionContext(Context params, Graph activeGraph, DatasetGraph
dataset, OpExecutorFactory factory) {
- this(params, activeGraph, dataset, factory,
cancellationSignal(params));
- }
-
- private static AtomicBoolean cancellationSignal(Context cxt) {
- if ( cxt == null )
- return null;
- try {
- return cxt.get(ARQConstants.symCancelQuery);
- } catch (ClassCastException ex) {
- Log.error(ExecutionContext.class, "Class cast exception: Expected
AtomicBoolean for cancel control: "+ex.getMessage());
- return null;
- }
+ this(params, activeGraph, dataset, factory,
Context.getCancelSignal(params));
}
private ExecutionContext(Context params, Graph activeGraph, DatasetGraph
dataset, OpExecutorFactory factory, AtomicBoolean cancelSignal) {
diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/engine/Timeouts.java
b/jena-arq/src/main/java/org/apache/jena/sparql/engine/Timeouts.java
index c0f0586cbb..5eb30c690b 100644
--- a/jena-arq/src/main/java/org/apache/jena/sparql/engine/Timeouts.java
+++ b/jena-arq/src/main/java/org/apache/jena/sparql/engine/Timeouts.java
@@ -22,10 +22,15 @@ import java.util.concurrent.TimeUnit;
import org.apache.jena.atlas.lib.Pair;
import org.apache.jena.atlas.logging.Log;
+import org.apache.jena.query.ARQ;
+import org.apache.jena.sparql.util.Context;
+import org.apache.jena.sparql.util.Symbol;
/** Processing timeout strings. */
public class Timeouts {
+ private static final long UNSET_AMOUNT = -1;
+
public static Pair<Long, Long> parseTimeoutStr(String str, TimeUnit unit) {
try {
if ( str.contains(",") ) {
@@ -49,4 +54,194 @@ public class Timeouts {
return null;
}
}
+
+ public static record DurationWithUnit(long amount, TimeUnit unit) {
+ public static DurationWithUnit UNSET = new
DurationWithUnit(UNSET_AMOUNT, TimeUnit.MILLISECONDS);
+
+ public boolean isSet() {
+ return amount >= 0;
+ }
+
+ public long asMillis() {
+ return (amount < 0) ? amount : unit.toMillis(amount);
+ }
+
+ /** Create an instance with normalized values: negative amounts become
-1 and a null unit is turned into milliseconds. */
+ public static DurationWithUnit of(long amount, TimeUnit unit) {
+ return new DurationWithUnit(amount < 0 ? -1 : amount,
nullToMillis(unit));
+ }
+ }
+
+ public static record Timeout(DurationWithUnit initialTimeout,
DurationWithUnit overallTimeout) {
+ public static Timeout UNSET = new Timeout(UNSET_AMOUNT, UNSET_AMOUNT);
+
+ public Timeout(long initialTimeout, TimeUnit initialTimeoutUnit, long
overallTimeout, TimeUnit overallTimeoutUnit) {
+ this(DurationWithUnit.of(initialTimeout, initialTimeoutUnit),
DurationWithUnit.of(overallTimeout, overallTimeoutUnit));
+ }
+
+ public Timeout(long initialTimeout, long overallTimeout) {
+ this(initialTimeout, TimeUnit.MILLISECONDS, overallTimeout,
TimeUnit.MILLISECONDS);
+ }
+
+ public boolean hasInitialTimeout() {
+ return initialTimeout().isSet();
+ }
+
+ public long initialTimeoutMillis() {
+ return initialTimeout().asMillis();
+ }
+
+ public boolean hasOverallTimeout() {
+ return overallTimeout().isSet();
+ }
+
+ public long overallTimeoutMillis() {
+ return overallTimeout().asMillis();
+ }
+
+ public boolean hasTimeout() {
+ return hasInitialTimeout() || hasOverallTimeout();
+ }
+ }
+
+ // TimeoutBuilder reserved as a possible super-interface for {Query,
Update}Exec(ution)Builder.
+ public static class TimeoutBuilderImpl {
+ protected long initialTimeout = UNSET_AMOUNT;
+ protected TimeUnit initialTimeoutUnit = null;
+ protected long overallTimeout = UNSET_AMOUNT;
+ protected TimeUnit overallTimeoutUnit = null;
+
+ /** Overwrite this builder's state with that of the argument. */
+ public TimeoutBuilderImpl timeout(Timeout timeout) {
+ initialTimeout(timeout.initialTimeout().amount(),
timeout.initialTimeout().unit());
+ overallTimeout(timeout.overallTimeout().amount(),
timeout.overallTimeout().unit());
+ return this;
+ }
+
+ public TimeoutBuilderImpl timeout(long value, TimeUnit timeUnit) {
+ initialTimeout(UNSET_AMOUNT, null);
+ overallTimeout(value, timeUnit);
+ return this;
+ }
+
+ public TimeoutBuilderImpl initialTimeout(long value, TimeUnit
timeUnit) {
+ this.initialTimeout = value < 0 ? UNSET_AMOUNT : value ;
+ this.initialTimeoutUnit = timeUnit;
+ return this;
+ }
+
+ public boolean hasInitialTimeout() {
+ return initialTimeout >= 0;
+ }
+
+ public TimeoutBuilderImpl overallTimeout(long value, TimeUnit
timeUnit) {
+ this.overallTimeout = value;
+ this.overallTimeoutUnit = timeUnit;
+ return this;
+ }
+
+ public boolean hasOverallTimeout() {
+ return overallTimeout >= 0;
+ }
+
+ public Timeout build() {
+ return new Timeout(initialTimeout,
nullToMillis(initialTimeoutUnit), overallTimeout,
nullToMillis(overallTimeoutUnit));
+ }
+ }
+
+ /** Update any unset timeout in the builder from the specification object.
*/
+ public static void applyDefaultTimeout(TimeoutBuilderImpl builder, Timeout
timeout) {
+ if (timeout != null) {
+ if ( !builder.hasInitialTimeout() )
+ builder.initialTimeout(timeout.initialTimeout().amount(),
timeout.initialTimeout().unit());
+ if ( !builder.hasOverallTimeout() )
+ builder.overallTimeout(timeout.overallTimeout().amount(),
timeout.overallTimeout().unit());
+ }
+ }
+
+ public static Timeout extractQueryTimeout(Context cxt) {
+ return extractTimeout(cxt, ARQ.queryTimeout);
+ }
+
+ public static Timeout extractUpdateTimeout(Context cxt) {
+ return extractTimeout(cxt, ARQ.updateTimeout);
+ }
+
+ public static Timeout extractTimeout(Context cxt, Symbol symbol) {
+ Object obj = cxt.get(symbol);
+ return parseTimeout(obj);
+ }
+
+ /** Creates a timeout instance from the object. Never returns null. */
+ public static Timeout parseTimeout(Object obj) {
+ Timeout result = Timeout.UNSET;
+ if ( obj != null ) {
+ try {
+ if ( obj instanceof Timeout to ) {
+ result = to;
+ } else if ( obj instanceof Number n ) {
+ long x = n.longValue();
+ result = new Timeout(UNSET_AMOUNT, x);
+ } else if ( obj instanceof String str ) {
+ Pair<Long, Long> pair = Timeouts.parseTimeoutStr(str,
TimeUnit.MILLISECONDS);
+ if ( pair == null ) {
+ Log.warn(Timeouts.class, "Bad timeout string: "+str);
+ return result;
+ }
+ result = new Timeout(pair.getLeft(), pair.getRight());
+ } else
+ Log.warn(Timeouts.class, "Can't interpret timeout: " +
obj);
+ } catch (Exception ex) {
+ Log.warn(Timeouts.class, "Exception setting timeouts (context)
from: "+obj, ex);
+ }
+ }
+ return result;
+ }
+
+ public static void setQueryTimeout(Context cxt, Timeout timeout) {
+ setTimeout(cxt, ARQ.queryTimeout, timeout);
+ }
+
+ public static void setUpdateTimeout(Context cxt, Timeout timeout) {
+ setTimeout(cxt, ARQ.updateTimeout, timeout);
+ }
+
+ public static void setTimeout(Context cxt, Symbol symbol, Timeout timeout)
{
+ Object obj = toContextValue(timeout);
+ cxt.set(symbol, obj);
+ }
+
+ /** Inverse function of {@link #parseTimeout(Object)}. */
+ public static Object toContextValue(Timeout timeout) {
+ Object result = timeout == null
+ ? null
+ : timeout.hasInitialTimeout()
+ ? toString(timeout)
+ : timeout.hasOverallTimeout()
+ ? timeout.overallTimeoutMillis()
+ : null;
+ return result;
+ }
+
+ /** Inverse function of {@link #parseTimeout(Object)}. */
+ public static String toString(Timeout timeout) {
+ String result = timeout.hasInitialTimeout()
+ ? timeout.initialTimeoutMillis() + "," +
timeout.overallTimeoutMillis()
+ : timeout.hasOverallTimeout()
+ ? Long.toString(timeout.overallTimeoutMillis())
+ : null;
+ return result;
+ }
+
+ // Set times from context if not set directly. e..g Context provides
default values.
+ // Contrast with SPARQLQueryProcessor where the context is limiting values
of the protocol parameter.
+ public static void applyDefaultQueryTimeoutFromContext(TimeoutBuilderImpl
builder, Context cxt) {
+ Timeout queryTimeout = extractQueryTimeout(cxt);
+ applyDefaultTimeout(builder, queryTimeout);
+ }
+
+ /** Returns milliseconds if the given time unit is null. */
+ private static TimeUnit nullToMillis(TimeUnit unit) {
+ return unit != null ? unit : TimeUnit.MILLISECONDS;
+ }
}
diff --git
a/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIterProcessBinding.java
b/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIterProcessBinding.java
index 72b1e045e1..8d77b586a6 100644
---
a/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIterProcessBinding.java
+++
b/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIterProcessBinding.java
@@ -23,7 +23,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.jena.atlas.lib.Lib ;
import org.apache.jena.query.QueryCancelledException;
-import org.apache.jena.sparql.ARQConstants;
import org.apache.jena.sparql.ARQInternalErrorException ;
import org.apache.jena.sparql.engine.ExecutionContext ;
import org.apache.jena.sparql.engine.QueryIterator ;
@@ -49,7 +48,7 @@ public abstract class QueryIterProcessBinding extends
QueryIter1 {
nextBinding = null ;
AtomicBoolean signal;
try {
- signal = context.getContext().get(ARQConstants.symCancelQuery);
+ signal = context.getCancelSignal();
} catch(Exception ex) {
signal = null;
}
diff --git
a/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIteratorBase.java
b/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIteratorBase.java
index e58361ea3c..f156b065d2 100644
---
a/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIteratorBase.java
+++
b/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIteratorBase.java
@@ -70,7 +70,7 @@ public abstract class QueryIteratorBase
}
private boolean requestingCancel() {
- return requestingCancel != null && requestingCancel.get() ;
+ return (requestingCancel != null && requestingCancel.get()) ||
Thread.interrupted() ;
}
private void haveCancelled() {}
diff --git
a/jena-arq/src/main/java/org/apache/jena/sparql/exec/QueryExecDataset.java
b/jena-arq/src/main/java/org/apache/jena/sparql/exec/QueryExecDataset.java
index d4029f3935..f79786f499 100644
--- a/jena-arq/src/main/java/org/apache/jena/sparql/exec/QueryExecDataset.java
+++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/QueryExecDataset.java
@@ -51,6 +51,7 @@ import org.apache.jena.sparql.engine.QueryIterator;
import org.apache.jena.sparql.engine.binding.Binding;
import org.apache.jena.sparql.engine.binding.BindingFactory;
import org.apache.jena.sparql.engine.iterator.QueryIteratorWrapper;
+import org.apache.jena.sparql.engine.Timeouts.Timeout;
import org.apache.jena.sparql.graph.GraphOps;
import org.apache.jena.sparql.modify.TemplateLib;
import org.apache.jena.sparql.syntax.ElementGroup;
@@ -87,11 +88,17 @@ public class QueryExecDataset implements QueryExec
private long timeout2 = TIMEOUT_UNSET;
private final AlarmClock alarmClock = AlarmClock.get();
private long queryStartTime = -1; // Unset
- private AtomicBoolean cancelSignal = new
AtomicBoolean(false);
+ private AtomicBoolean cancelSignal;
+ @Deprecated
protected QueryExecDataset(Query query, String queryString, DatasetGraph
datasetGraph, Context cxt,
- QueryEngineFactory qeFactory,
- long timeout1, TimeUnit timeUnit1, long
timeout2, TimeUnit timeUnit2,
+ QueryEngineFactory qeFactory, long timeout1, TimeUnit timeUnit1,
long timeout2, TimeUnit timeUnit2,
+ Binding initialToEngine) {
+ this(query, queryString, datasetGraph, cxt, qeFactory, new
Timeout(timeout1, timeUnit1, timeout2, timeUnit2), initialToEngine);
+ }
+
+ protected QueryExecDataset(Query query, String queryString, DatasetGraph
datasetGraph, Context cxt,
+ QueryEngineFactory qeFactory, Timeout timeout,
Binding initialToEngine) {
// Context cxt is already a safe copy.
this.query = query;
@@ -99,10 +106,14 @@ public class QueryExecDataset implements QueryExec
this.dataset = datasetGraph;
this.qeFactory = qeFactory;
this.context = (cxt == null) ? Context.setupContextForDataset(cxt,
datasetGraph) : cxt;
- this.timeout1 = asMillis(timeout1, timeUnit1);
- this.timeout2 = asMillis(timeout2, timeUnit2);
+ this.timeout1 = timeout.initialTimeoutMillis();
+ this.timeout2 = timeout.overallTimeoutMillis();
// See also query substitution handled in QueryExecBuilder
this.initialBinding = initialToEngine;
+
+ // Cancel signal may originate from an e.c. an update execution.
+ this.cancelSignal = Context.getOrSetCancelSignal(context);
+
init();
}
@@ -112,10 +123,6 @@ public class QueryExecDataset implements QueryExec
context.put(ARQConstants.sysCurrentQuery, query);
}
- private static long asMillis(long duration, TimeUnit timeUnit) {
- return (duration < 0) ? duration : timeUnit.toMillis(duration);
- }
-
@Override
public void close() {
closed = true;
@@ -468,16 +475,6 @@ public class QueryExecDataset implements QueryExec
return;
}
- // JENA-2141 - the timeout can go off while building the query
iterator structure.
- // In this case, use a signal passed through the context.
- // We don't know if getPlan().iterator() does a lot of work or not
- // (ideally it shouldn't start executing the query but in some
sub-systems
- // it might be necessary)
- //
- // This applies to the time to first result because to get the first
result, the
- // queryIterator must have been built. So it does not apply for the
second
- // stage of N,-1 or N,M.
- context.set(ARQConstants.symCancelQuery, cancelSignal);
TimeoutCallback callback = new TimeoutCallback() ;
expectedCallback.set(callback) ;
diff --git
a/jena-arq/src/main/java/org/apache/jena/sparql/exec/QueryExecDatasetBuilder.java
b/jena-arq/src/main/java/org/apache/jena/sparql/exec/QueryExecDatasetBuilder.java
index eb0f705366..23e003009a 100644
---
a/jena-arq/src/main/java/org/apache/jena/sparql/exec/QueryExecDatasetBuilder.java
+++
b/jena-arq/src/main/java/org/apache/jena/sparql/exec/QueryExecDatasetBuilder.java
@@ -23,7 +23,6 @@ import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
-import org.apache.jena.atlas.lib.Pair;
import org.apache.jena.atlas.logging.Log;
import org.apache.jena.graph.Graph;
import org.apache.jena.graph.Node;
@@ -34,8 +33,10 @@ import org.apache.jena.sparql.core.DatasetGraphFactory;
import org.apache.jena.sparql.core.Var;
import org.apache.jena.sparql.engine.QueryEngineFactory;
import org.apache.jena.sparql.engine.QueryEngineRegistry;
-import org.apache.jena.sparql.engine.Timeouts;
import org.apache.jena.sparql.engine.binding.Binding;
+import org.apache.jena.sparql.engine.Timeouts;
+import org.apache.jena.sparql.engine.Timeouts.Timeout;
+import org.apache.jena.sparql.engine.Timeouts.TimeoutBuilderImpl;
import org.apache.jena.sparql.syntax.syntaxtransform.QueryTransformOps;
import org.apache.jena.sparql.util.Context;
import org.apache.jena.sparql.util.ContextAccumulator;
@@ -69,10 +70,7 @@ public class QueryExecDatasetBuilder implements
QueryExecMod, QueryExecBuilder {
// Uses initial binding to execution (old, original) feature
private Binding initialBinding = null;
- private long initialTimeout = UNSET;
- private TimeUnit initialTimeoutUnit = null;
- private long overallTimeout = UNSET;
- private TimeUnit overallTimeoutUnit = null;
+ private TimeoutBuilderImpl timeoutBuilder = new TimeoutBuilderImpl();
private QueryExecDatasetBuilder() { }
@@ -166,60 +164,22 @@ public class QueryExecDatasetBuilder implements
QueryExecMod, QueryExecBuilder {
@Override
public QueryExecDatasetBuilder timeout(long value, TimeUnit timeUnit) {
- this.initialTimeout = UNSET;
- this.initialTimeoutUnit = null;
- this.overallTimeout = value;
- this.overallTimeoutUnit = timeUnit;
+ timeoutBuilder.timeout(value, timeUnit);
return this;
}
@Override
public QueryExecDatasetBuilder initialTimeout(long value, TimeUnit
timeUnit) {
- this.initialTimeout = value < 0 ? -1L : value ;
- this.initialTimeoutUnit = timeUnit;
+ timeoutBuilder.initialTimeout(value, timeUnit);
return this;
}
@Override
public QueryExecDatasetBuilder overallTimeout(long value, TimeUnit
timeUnit) {
- this.overallTimeout = value;
- this.overallTimeoutUnit = timeUnit;
+ timeoutBuilder.overallTimeout(value, timeUnit);
return this;
}
- // Set times from context if not set directly. e..g Context provides
default values.
- // Contrast with SPARQLQueryProcessor where the context is limiting values
of the protocol parameter.
- private static void defaultTimeoutsFromContext(QueryExecDatasetBuilder
builder, Context cxt) {
- applyTimeouts(builder, cxt.get(ARQ.queryTimeout));
- }
-
- /** Take obj, find the timeout(s) and apply to the builder */
- private static void applyTimeouts(QueryExecDatasetBuilder builder, Object
obj) {
- if ( obj == null )
- return ;
- try {
- if ( obj instanceof Number ) {
- long x = ((Number)obj).longValue();
- if ( builder.overallTimeout < 0 )
- builder.overallTimeout(x, TimeUnit.MILLISECONDS);
- } else if ( obj instanceof String ) {
- String str = obj.toString();
- Pair<Long, Long> pair = Timeouts.parseTimeoutStr(str,
TimeUnit.MILLISECONDS);
- if ( pair == null ) {
- Log.warn(builder, "Bad timeout string: "+str);
- return ;
- }
- if ( builder.initialTimeout < 0 )
- builder.initialTimeout(pair.getLeft(),
TimeUnit.MILLISECONDS);
- if ( builder.overallTimeout < 0 )
- builder.overallTimeout(pair.getRight(),
TimeUnit.MILLISECONDS);
- } else
- Log.warn(builder, "Can't interpret timeout: " + obj);
- } catch (Exception ex) {
- Log.warn(builder, "Exception setting timeouts (context) from:
"+obj);
- }
- }
-
@Override
public QueryExec build() {
Objects.requireNonNull(query, "No query for QueryExec");
@@ -243,17 +203,17 @@ public class QueryExecDatasetBuilder implements
QueryExecMod, QueryExecBuilder {
queryStringActual = null;
}
- defaultTimeoutsFromContext(this, cxt);
+ Timeouts.applyDefaultQueryTimeoutFromContext(this.timeoutBuilder, cxt);
if ( dataset != null )
cxt.set(ARQConstants.sysCurrentDataset,
DatasetFactory.wrap(dataset));
if ( queryActual != null )
cxt.set(ARQConstants.sysCurrentQuery, queryActual);
+ Timeout timeout = timeoutBuilder.build();
+
QueryExec qExec = new QueryExecDataset(queryActual, queryStringActual,
dataset, cxt, qeFactory,
- initialTimeout,
initialTimeoutUnit,
- overallTimeout,
overallTimeoutUnit,
- initialBinding);
+ timeout, initialBinding);
return qExec;
}
}
diff --git
a/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecAdapter.java
b/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecAdapter.java
index fd23e63af9..b97c2af042 100644
--- a/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecAdapter.java
+++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecAdapter.java
@@ -18,6 +18,7 @@
package org.apache.jena.sparql.exec;
+import org.apache.jena.sparql.util.Context;
import org.apache.jena.update.UpdateExecution;
public class UpdateExecAdapter implements UpdateExec {
@@ -39,4 +40,14 @@ public class UpdateExecAdapter implements UpdateExec {
@Override
public void execute() { updateProc.execute(); }
+
+ @Override
+ public Context getContext() {
+ return updateProc.getContext();
+ }
+
+ @Override
+ public void abort() {
+ updateProc.abort();
+ }
}
diff --git
a/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecBuilder.java
b/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecBuilder.java
index 46434ed84f..02f56413c9 100644
--- a/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecBuilder.java
+++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecBuilder.java
@@ -18,6 +18,8 @@
package org.apache.jena.sparql.exec;
+import java.util.concurrent.TimeUnit;
+
import org.apache.jena.graph.Node;
import org.apache.jena.query.ARQ;
import org.apache.jena.sparql.core.Var;
@@ -65,6 +67,8 @@ public interface UpdateExecBuilder {
return substitution(Var.alloc(var), value);
}
+ public UpdateExecBuilder timeout(long value, TimeUnit timeUnit);
+
public UpdateExec build();
/** Build and execute. */
diff --git
a/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecBuilderAdapter.java
b/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecBuilderAdapter.java
index 1355939aa6..461373bb0e 100644
---
a/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecBuilderAdapter.java
+++
b/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecBuilderAdapter.java
@@ -19,6 +19,7 @@
package org.apache.jena.sparql.exec;
import java.util.Objects;
+import java.util.concurrent.TimeUnit;
import org.apache.jena.graph.Node;
import org.apache.jena.sparql.core.ResultBinding;
@@ -118,6 +119,12 @@ public class UpdateExecBuilderAdapter
return this;
}
+ @Override
+ public UpdateExecBuilder timeout(long timeout, TimeUnit timeoutUnit) {
+ builder = builder.timeout(timeout, timeoutUnit);
+ return this;
+ }
+
@Override
public UpdateExec build() {
UpdateExecution updateExec = builder.build();
diff --git
a/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecDataset.java
b/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecDataset.java
index b6f0eed827..1443aebdf8 100644
--- a/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecDataset.java
+++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecDataset.java
@@ -20,6 +20,7 @@ package org.apache.jena.sparql.exec;
import org.apache.jena.sparql.core.DatasetGraph;
import org.apache.jena.sparql.engine.binding.Binding;
+import org.apache.jena.sparql.engine.Timeouts.Timeout;
import org.apache.jena.sparql.modify.UpdateEngineFactory;
import org.apache.jena.sparql.modify.UpdateProcessorBase;
import org.apache.jena.sparql.util.Context;
@@ -28,8 +29,8 @@ import org.apache.jena.update.UpdateRequest;
public class UpdateExecDataset extends UpdateProcessorBase implements
UpdateExec {
protected UpdateExecDataset(UpdateRequest request, DatasetGraph
datasetGraph,
- Binding inputBinding, Context context,
UpdateEngineFactory factory) {
- super(request, datasetGraph, inputBinding, context, factory);
+ Binding inputBinding, Context context,
UpdateEngineFactory factory, Timeout timeout) {
+ super(request, datasetGraph, inputBinding, context, factory, timeout);
}
@Override
diff --git
a/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecDatasetBuilder.java
b/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecDatasetBuilder.java
index 52979eedf7..f58c9b3bda 100644
---
a/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecDatasetBuilder.java
+++
b/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecDatasetBuilder.java
@@ -21,16 +21,20 @@ package org.apache.jena.sparql.exec;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.TimeUnit;
import org.apache.jena.graph.Node;
-import org.apache.jena.query.Query;
+import org.apache.jena.query.ARQ;
import org.apache.jena.sparql.core.DatasetGraph;
import org.apache.jena.sparql.core.Var;
import org.apache.jena.sparql.engine.binding.Binding;
+import org.apache.jena.sparql.engine.Timeouts.Timeout;
+import org.apache.jena.sparql.engine.Timeouts.TimeoutBuilderImpl;
import org.apache.jena.sparql.modify.UpdateEngineFactory;
import org.apache.jena.sparql.modify.UpdateEngineRegistry;
import org.apache.jena.sparql.syntax.syntaxtransform.UpdateTransformOps;
import org.apache.jena.sparql.util.Context;
+import org.apache.jena.sparql.util.ContextAccumulator;
import org.apache.jena.sparql.util.Symbol;
import org.apache.jena.update.Update;
import org.apache.jena.update.UpdateException;
@@ -41,15 +45,18 @@ public class UpdateExecDatasetBuilder implements
UpdateExecBuilder {
public static UpdateExecDatasetBuilder create() { return new
UpdateExecDatasetBuilder(); }
- private DatasetGraph dataset = null;
- private Query query = null;
- private Context context = null;
+ private DatasetGraph dataset = null;
+ private ContextAccumulator contextAcc =
ContextAccumulator.newBuilder(()->ARQ.getContext(),
()->Context.fromDataset(dataset));
+
// Uses query rewrite to replace variables by values.
- private Map<Var, Node> substitutionMap = null;
+ private Map<Var, Node> substitutionMap = null;
+
+ private Binding initialBinding = null;
+
+ private TimeoutBuilderImpl timeoutBuilder = new TimeoutBuilderImpl();
- private Binding initialBinding = null;
- private UpdateRequest update = null;
- private UpdateRequest updateRequest = new UpdateRequest();
+ private UpdateRequest update = null;
+ private UpdateRequest updateRequest = new UpdateRequest();
private UpdateExecDatasetBuilder() {}
@@ -96,29 +103,24 @@ public class UpdateExecDatasetBuilder implements
UpdateExecBuilder {
public UpdateExecDatasetBuilder context(Context context) {
if ( context == null )
return this;
- ensureContext();
- this.context.putAll(context);
+ this.contextAcc.context(context);
return this;
}
@Override
public UpdateExecDatasetBuilder set(Symbol symbol, Object value) {
- ensureContext();
- this.context.set(symbol, value);
+ this.contextAcc.set(symbol, value);
return this;
}
@Override
public UpdateExecDatasetBuilder set(Symbol symbol, boolean value) {
- ensureContext();
- this.context.set(symbol, value);
+ this.contextAcc.set(symbol, value);
return this;
}
-
- private void ensureContext() {
- if ( context == null )
- context = new Context();
+ public Context getContext() {
+ return contextAcc.context();
}
@Override
@@ -140,6 +142,12 @@ public class UpdateExecDatasetBuilder implements
UpdateExecBuilder {
substitutionMap = new HashMap<>();
}
+ @Override
+ public UpdateExecDatasetBuilder timeout(long timeout, TimeUnit
timeoutUnit) {
+ this.timeoutBuilder.timeout(timeout, timeoutUnit);
+ return this;
+ }
+
/** Use {@link #substitution(Binding)} */
@Deprecated
public UpdateExecDatasetBuilder initialBinding(Binding initialBinding) {
@@ -157,11 +165,14 @@ public class UpdateExecDatasetBuilder implements
UpdateExecBuilder {
if ( substitutionMap != null && ! substitutionMap.isEmpty() )
actualUpdate = UpdateTransformOps.transform(actualUpdate,
substitutionMap);
- Context cxt = Context.setupContextForDataset(context, dataset);
+ Context cxt = getContext();
UpdateEngineFactory f = UpdateEngineRegistry.get().find(dataset, cxt);
if ( f == null )
throw new UpdateException("Failed to find an UpdateEngine");
- UpdateExec uExec = new UpdateExecDataset(actualUpdate, dataset,
initialBinding, cxt, f);
+
+ Timeout timeout = timeoutBuilder.build();
+
+ UpdateExec uExec = new UpdateExecDataset(actualUpdate, dataset,
initialBinding, cxt, f, timeout);
return uExec;
}
diff --git
a/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecutionAdapter.java
b/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecutionAdapter.java
index 967a473166..7123b9c32f 100644
---
a/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecutionAdapter.java
+++
b/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecutionAdapter.java
@@ -18,6 +18,7 @@
package org.apache.jena.sparql.exec;
+import org.apache.jena.sparql.util.Context;
import org.apache.jena.update.UpdateExecution;
public class UpdateExecutionAdapter implements UpdateExecution {
@@ -40,4 +41,13 @@ public class UpdateExecutionAdapter implements
UpdateExecution {
@Override
public void execute() { updateExec.execute(); }
+ @Override
+ public Context getContext() {
+ return updateExec.getContext();
+ }
+
+ @Override
+ public void abort() {
+ updateExec.abort();
+ }
}
diff --git
a/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecutionBuilderAdapter.java
b/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecutionBuilderAdapter.java
index 48240e6822..88e91e98c4 100644
---
a/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecutionBuilderAdapter.java
+++
b/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecutionBuilderAdapter.java
@@ -18,6 +18,8 @@
package org.apache.jena.sparql.exec;
+import java.util.concurrent.TimeUnit;
+
import org.apache.jena.query.QuerySolution;
import org.apache.jena.rdf.model.RDFNode;
import org.apache.jena.sparql.engine.binding.Binding;
@@ -103,6 +105,12 @@ public class UpdateExecutionBuilderAdapter implements
UpdateExecutionBuilder {
return this;
}
+ @Override
+ public UpdateExecutionBuilder timeout(long value, TimeUnit timeUnit) {
+ builder.timeout(value, timeUnit);
+ return this;
+ }
+
@Override
public UpdateExecution build() {
return UpdateExecutionAdapter.adapt(builder.build());
diff --git
a/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/QueryExecHTTP.java
b/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/QueryExecHTTP.java
index beb947f4f0..1d5e08f59a 100644
--- a/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/QueryExecHTTP.java
+++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/QueryExecHTTP.java
@@ -114,7 +114,7 @@ public class QueryExecHTTP implements QueryExec {
private String httpResponseContentType = null;
// Releasing HTTP input streams is important. We remember this for SELECT
result
// set streaming, and will close it when the execution is closed
- private InputStream retainedConnection = null;
+ private volatile InputStream retainedConnection = null;
private HttpClient httpClient = HttpEnv.getDftHttpClient();
private Map<String, String> httpHeaders;
diff --git
a/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/UpdateExecHTTP.java
b/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/UpdateExecHTTP.java
index e39fac4d93..1269083107 100644
---
a/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/UpdateExecHTTP.java
+++
b/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/UpdateExecHTTP.java
@@ -29,7 +29,10 @@ import java.net.http.HttpResponse;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.jena.atlas.logging.Log;
import org.apache.jena.http.HttpEnv;
import org.apache.jena.http.HttpLib;
import org.apache.jena.riot.WebContent;
@@ -59,13 +62,19 @@ public class UpdateExecHTTP implements UpdateExec {
private final Params params;
private final List<String> usingGraphURIs;
private final List<String> usingNamedGraphURIs;
+ private final long timeout;
+ private final TimeUnit timeoutUnit;
+
+ private AtomicBoolean cancelSignal = new AtomicBoolean(false);
+ private volatile InputStream retainedConnection = null;
/*package*/ UpdateExecHTTP(String serviceURL, UpdateRequest update, String
updateString,
HttpClient httpClient, Params params,
List<String> usingGraphURIs,
List<String> usingNamedGraphURIs,
Map<String, String> httpHeaders, UpdateSendMode
sendMode,
- Context context) {
+ Context context,
+ long timeout, TimeUnit timeoutUnit) {
this.context = context;
this.service = serviceURL;
//this.update = update;
@@ -77,17 +86,14 @@ public class UpdateExecHTTP implements UpdateExec {
this.usingNamedGraphURIs = usingNamedGraphURIs;
this.httpHeaders = httpHeaders;
this.sendMode = sendMode;
+ this.timeout = timeout;
+ this.timeoutUnit = timeoutUnit;
}
-// @Override
-// public Context getContext() {
-// return null;
-// }
-//
-// @Override
-// public DatasetGraph getDatasetGraph() {
-// return null;
-// }
+ @Override
+ public Context getContext() {
+ return context;
+ }
@Override
public void execute() {
@@ -130,13 +136,40 @@ public class UpdateExecHTTP implements UpdateExec {
}
private String executeUpdate(String requestURL, BodyPublisher body, String
contentType) {
- HttpRequest.Builder builder = HttpLib.requestBuilder(requestURL,
httpHeaders, -1L, null);
+ HttpRequest.Builder builder = HttpLib.requestBuilder(requestURL,
httpHeaders, timeout, timeoutUnit);
builder = contentTypeHeader(builder, contentType);
HttpRequest request = builder.POST(body).build();
logUpdate(updateString, request);
HttpResponse<InputStream> response = HttpLib.execute(httpClient,
request);
- return handleResponseRtnString(response);
+ return HttpLib.handleResponseRtnString(response,
this::setRetainedConnection);
+ }
+
+ private void setRetainedConnection(InputStream in) {
+ synchronized (cancelSignal) {
+ retainedConnection = in;
+ if (cancelSignal.get()) {
+ abort();
+ }
+ }
}
private static void logUpdate(String updateString, HttpRequest request) {}
+
+ /** Best effort that tries to close an underlying HTTP connection.
+ * May still hang waiting for the HTTP request to complete. */
+ @Override
+ public void abort() {
+ cancelSignal.set(true);
+ synchronized (cancelSignal) {
+ try {
+ InputStream in = retainedConnection;
+ if (in != null) {
+ in.close();
+ retainedConnection = null;
+ }
+ } catch (Exception ex) {
+ Log.warn(this, "Error during abort", ex);
+ }
+ }
+ }
}
diff --git
a/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/UpdateExecHTTPBuilder.java
b/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/UpdateExecHTTPBuilder.java
index fb11b20985..c38ff2c72c 100644
---
a/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/UpdateExecHTTPBuilder.java
+++
b/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/UpdateExecHTTPBuilder.java
@@ -45,6 +45,6 @@ public class UpdateExecHTTPBuilder extends
ExecUpdateHTTPBuilder<UpdateExecHTTP,
copyArray(usingGraphURIs),
copyArray(usingNamedGraphURIs),
new HashMap<>(httpHeaders),
- sendMode, cxt);
+ sendMode, cxt, timeout, timeoutUnit);
}
}
diff --git
a/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/UpdateExecutionHTTPBuilder.java
b/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/UpdateExecutionHTTPBuilder.java
index 5d6350c040..f4aecfb0b7 100644
---
a/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/UpdateExecutionHTTPBuilder.java
+++
b/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/UpdateExecutionHTTPBuilder.java
@@ -52,7 +52,8 @@ public class UpdateExecutionHTTPBuilder
copyArray(usingGraphURIs),
copyArray(usingNamedGraphURIs),
new HashMap<>(httpHeaders),
- sendMode, cxt);
+ sendMode, cxt,
+ timeout, timeoutUnit);
return new UpdateExecutionHTTP(uExec);
}
diff --git
a/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateEngine.java
b/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateEngine.java
index d8ef8284dd..74171307c5 100644
--- a/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateEngine.java
+++ b/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateEngine.java
@@ -30,12 +30,12 @@ public interface UpdateEngine
* Signal start of a request being executed
*/
public void startRequest();
-
+
/**
- * Signal end of a request being executed
+ * Signal end of a request being executed
*/
public void finishRequest();
-
+
/**
* Returns an {@link UpdateSink} that accepts Update operations
*/
diff --git
a/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateEngineBase.java
b/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateEngineBase.java
index 18498224c3..c521ea63c8 100644
--- a/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateEngineBase.java
+++ b/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateEngineBase.java
@@ -18,7 +18,6 @@
package org.apache.jena.sparql.modify;
-import org.apache.jena.query.ARQ ;
import org.apache.jena.sparql.ARQConstants ;
import org.apache.jena.sparql.core.DatasetGraph ;
import org.apache.jena.sparql.engine.binding.Binding ;
@@ -39,18 +38,16 @@ public abstract class UpdateEngineBase implements
UpdateEngine
this.inputBinding = inputBinding ;
this.context = setupContext(context, datasetGraph) ;
}
-
- private static Context setupContext(Context context, DatasetGraph dataset)
+
+ private Context setupContext(Context cxt, DatasetGraph dataset)
{
- // To many copies?
- if ( context == null ) // Copy of global context to protect
against change.
- context = ARQ.getContext() ;
- context = context.copy() ;
+ // The following setup is effectively the same as in QueryEngineBase
+ Context result = cxt;
+
+ if ( result == null )
+ result = Context.setupContextForDataset(cxt, dataset);
- if ( dataset.getContext() != null )
- context.putAll(dataset.getContext()) ;
-
- context.set(ARQConstants.sysCurrentTime,
NodeFactoryExtra.nowAsDateTime()) ;
- return context ;
+ result.set(ARQConstants.sysCurrentTime,
NodeFactoryExtra.nowAsDateTime()) ;
+ return result ;
}
}
diff --git
a/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateEngineMain.java
b/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateEngineMain.java
index 597e1c0b9c..45b23e387f 100644
--- a/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateEngineMain.java
+++ b/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateEngineMain.java
@@ -38,7 +38,7 @@ import org.apache.jena.sparql.util.Context ;
* See {@link UpdateEngineNonStreaming} for a subclass that accumulates
updates, including during
* parsing then executes the operation.
*/
-public class UpdateEngineMain extends UpdateEngineBase
+public class UpdateEngineMain extends UpdateEngineBase
{
/**
* Creates a new Update Engine
@@ -53,12 +53,12 @@ public class UpdateEngineMain extends UpdateEngineBase
@Override
public void startRequest() {}
-
+
@Override
public void finishRequest() {}
-
+
private UpdateSink updateSink = null ;
-
+
/*
* Returns the {@link UpdateSink}. In this implementation, this is done by
with
* an {@link UpdateVisitor} which will visit each update operation and
send the
@@ -71,11 +71,11 @@ public class UpdateEngineMain extends UpdateEngineBase
{
if ( updateSink == null )
updateSink = new UpdateVisitorSink(this.prepareWorker(),
- sink(q->datasetGraph.add(q)),
+ sink(q->datasetGraph.add(q)),
sink(q->datasetGraph.delete(q)));
return updateSink ;
}
-
+
/**
* Creates the {@link UpdateVisitor} which will do the work of applying
the updates
* @return The update visitor to be used to apply the updates
@@ -84,18 +84,18 @@ public class UpdateEngineMain extends UpdateEngineBase
return new UpdateEngineWorker(datasetGraph, inputBinding, context) ;
}
- /** Direct a sink to a Consumer. */
+ /** Direct a sink to a Consumer. */
private <X> Sink<X> sink(Consumer<X> action) {
return new Sink<X>() {
@Override
public void send(X item) { action.accept(item); }
- @Override public void close() {}
+ @Override public void close() {}
@Override public void flush() {}
- };
+ };
}
-
+
private static UpdateEngineFactory factory = new UpdateEngineFactory()
{
@Override
diff --git
a/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateEngineWorker.java
b/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateEngineWorker.java
index 5c4eeab04b..40a443bba7 100644
---
a/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateEngineWorker.java
+++
b/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateEngineWorker.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.jena.atlas.data.BagFactory;
import org.apache.jena.atlas.data.DataBag;
@@ -45,6 +46,8 @@ import org.apache.jena.sparql.core.*;
import org.apache.jena.sparql.engine.binding.Binding;
import org.apache.jena.sparql.engine.binding.BindingRoot;
import org.apache.jena.sparql.exec.*;
+import org.apache.jena.sparql.engine.Timeouts;
+import org.apache.jena.sparql.engine.Timeouts.Timeout;
import org.apache.jena.sparql.graph.GraphFactory;
import org.apache.jena.sparql.graph.GraphOps;
import org.apache.jena.sparql.modify.request.*;
@@ -64,10 +67,42 @@ public class UpdateEngineWorker implements UpdateVisitor
protected final Binding inputBinding; // Used for UpdateModify only:
substitution is better.
protected final Context context;
+ protected final Timeout timeout;
+
+ /** Used to compute the remaining overall time that may be spent in query
execution. */
+ protected long startTimeMillis = -1;
+
+ /** The currently executing query exec. */
+ protected final AtomicBoolean cancelSignal;
+ protected volatile QueryExec activeQExec = null;
+
public UpdateEngineWorker(DatasetGraph datasetGraph, Binding inputBinding,
Context context) {
this.datasetGraph = datasetGraph;
this.inputBinding = inputBinding;
this.context = context;
+ this.timeout = Timeouts.extractUpdateTimeout(context);
+ this.cancelSignal = Context.getOrSetCancelSignal(context);
+ }
+
+ public void abort() {
+ if (cancelSignal.compareAndSet(false, true)) {
+ synchronized (this) {
+ // If the change of the cancel signal happened here then abort
the activeQExec.
+ if (activeQExec != null) {
+ activeQExec.abort();
+ }
+ }
+ }
+ }
+
+ private synchronized void setQExec(QueryExec qExec) {
+ synchronized (this) {
+ this.activeQExec = qExec;
+ // Cancel the qExec immediately if the cancel signal is true.
+ if (cancelSignal.get()) {
+ activeQExec.abort();
+ }
+ }
}
@Override
@@ -528,7 +563,7 @@ public class UpdateEngineWorker implements UpdateVisitor
}
@SuppressWarnings("all")
- protected static Iterator<Binding> evalBindings(Query query, DatasetGraph
dsg, Binding inputBinding, Context context) {
+ protected Iterator<Binding> evalBindings(Query query, DatasetGraph dsg,
Binding inputBinding, Context context) {
// The UpdateProcessorBase already copied the context and made it safe
// ... but that's going to happen again :-(
if ( query == null ) {
@@ -536,8 +571,10 @@ public class UpdateEngineWorker implements UpdateVisitor
return Iter.singletonIterator(binding);
}
+ updateRemainingQueryTimeout(context);
+
// Not QueryExecDataset.dataset(...) because of initialBinding.
- QueryExecDatasetBuilder builder =
QueryExecDatasetBuilder.create().dataset(dsg).query(query);
+ QueryExecDatasetBuilder builder =
QueryExecDatasetBuilder.create().dataset(dsg).query(query).context(context);
if ( inputBinding != null ) {
// Must use initialBinding - it puts the input in the results,
unlike substitution.
builder.initialBinding(inputBinding);
@@ -545,9 +582,34 @@ public class UpdateEngineWorker implements UpdateVisitor
// builder.substitution(inputBinding);
}
QueryExec qExec = builder.build();
+ setQExec(qExec);
return qExec.select();
}
+ private void updateRemainingQueryTimeout(Context context) {
+ Timeout finalTimeout = null;
+ if (timeout.hasOverallTimeout()) {
+ long remainingOverallTimeoutMillis = -1;
+ if (startTimeMillis < 0) {
+ startTimeMillis = System.currentTimeMillis();
+ remainingOverallTimeoutMillis = timeout.overallTimeoutMillis();
+ } else {
+ long currentTimeMillis = System.currentTimeMillis();
+ long elapsedMillis = currentTimeMillis - startTimeMillis;
+ remainingOverallTimeoutMillis -= elapsedMillis;
+ if (remainingOverallTimeoutMillis < 0) {
+ remainingOverallTimeoutMillis = 0;
+ }
+ }
+ finalTimeout = new Timeout(timeout.initialTimeoutMillis(),
remainingOverallTimeoutMillis);
+ } else if(timeout.hasInitialTimeout()) {
+ finalTimeout = new Timeout(timeout.initialTimeoutMillis(), -1);
+ }
+
+ // Override any prior queryTimeout symbol with a fresh value computed
from the configured updateTimeout.
+ Timeouts.setQueryTimeout(context, finalTimeout);
+ }
+
/**
* Execute.
* <br/>
diff --git
a/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateProcessorBase.java
b/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateProcessorBase.java
index e4b1b53c5e..8b738a9f12 100644
---
a/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateProcessorBase.java
+++
b/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateProcessorBase.java
@@ -18,9 +18,13 @@
package org.apache.jena.sparql.modify;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import org.apache.jena.atlas.iterator.Iter ;
import org.apache.jena.sparql.core.DatasetGraph ;
import org.apache.jena.sparql.engine.binding.Binding ;
+import org.apache.jena.sparql.engine.Timeouts;
+import org.apache.jena.sparql.engine.Timeouts.Timeout;
import org.apache.jena.sparql.util.Context ;
import org.apache.jena.update.UpdateProcessor ;
import org.apache.jena.update.UpdateRequest ;
@@ -35,12 +39,23 @@ public class UpdateProcessorBase implements UpdateProcessor
protected final Binding inputBinding;
protected final UpdateEngineFactory factory ;
protected final Context context ;
+ protected final Timeout timeout ;
+
+ @Deprecated
+ public UpdateProcessorBase(UpdateRequest request,
+ DatasetGraph datasetGraph,
+ Binding inputBinding,
+ Context context,
+ UpdateEngineFactory factory) {
+ this(request, datasetGraph, inputBinding, context, factory, null);
+ }
public UpdateProcessorBase(UpdateRequest request,
DatasetGraph datasetGraph,
Binding inputBinding,
Context context,
- UpdateEngineFactory factory)
+ UpdateEngineFactory factory,
+ Timeout timeout)
{
this.request = request ;
this.datasetGraph = datasetGraph ;
@@ -48,6 +63,11 @@ public class UpdateProcessorBase implements UpdateProcessor
this.context = context;
Context.setCurrentDateTime(this.context) ;
this.factory = factory ;
+ this.timeout = timeout;
+ Context.getOrSetCancelSignal(this.context) ;
+ if (timeout != null) {
+ Timeouts.setUpdateTimeout(context, timeout);
+ }
}
@Override
@@ -55,6 +75,7 @@ public class UpdateProcessorBase implements UpdateProcessor
UpdateEngine uProc = factory.create(datasetGraph, inputBinding,
context);
uProc.startRequest();
+ // context.get(ARQ.updateTimeout);
try {
UpdateSink sink = uProc.getUpdateSink();
Iter.sendToSink(request.iterator(), sink); // Will call close
on sink if there are no exceptions
@@ -62,4 +83,19 @@ public class UpdateProcessorBase implements UpdateProcessor
uProc.finishRequest() ;
}
}
+
+ @Override
+ public Context getContext() {
+ return context;
+ }
+
+ @Override
+ public void abort() {
+ // Right now abort is only signaled via the context's cancel signal.
+ // An improvement might be introducing UpdateEngine.abort().
+ AtomicBoolean cancelSignal = Context.getCancelSignal(context);
+ if (cancelSignal != null) {
+ cancelSignal.set(true);
+ }
+ }
}
diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/util/Context.java
b/jena-arq/src/main/java/org/apache/jena/sparql/util/Context.java
index 9521ea3f7d..63fd6e8da0 100644
--- a/jena-arq/src/main/java/org/apache/jena/sparql/util/Context.java
+++ b/jena-arq/src/main/java/org/apache/jena/sparql/util/Context.java
@@ -20,9 +20,11 @@ package org.apache.jena.sparql.util;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import org.apache.jena.atlas.lib.Lib;
+import org.apache.jena.atlas.logging.Log;
import org.apache.jena.query.ARQ;
import org.apache.jena.sparql.ARQConstants;
import org.apache.jena.sparql.ARQException;
@@ -418,6 +420,26 @@ public class Context {
context.set(ARQConstants.sysCurrentTime,
NodeFactoryExtra.nowAsDateTime());
}
+ public static AtomicBoolean getCancelSignal(Context context) {
+ if ( context == null )
+ return null;
+ try {
+ return context.get(ARQConstants.symCancelQuery);
+ } catch (ClassCastException ex) {
+ Log.error(Context.class, "Class cast exception: Expected
AtomicBoolean for cancel control: "+ex.getMessage());
+ return null;
+ }
+ }
+
+ public static AtomicBoolean getOrSetCancelSignal(Context context) {
+ AtomicBoolean cancelSignal = getCancelSignal(context);
+ if (cancelSignal == null) {
+ cancelSignal = new AtomicBoolean(false);
+ context.set(ARQConstants.symCancelQuery, cancelSignal);
+ }
+ return cancelSignal;
+ }
+
/** Merge an outer (defaults to the system global context)
* and local context to produce a new context
* The new context is always a separate copy.
diff --git
a/jena-arq/src/main/java/org/apache/jena/update/UpdateExecutionBuilder.java
b/jena-arq/src/main/java/org/apache/jena/update/UpdateExecutionBuilder.java
index 0a294e3c22..3672c15b8b 100644
--- a/jena-arq/src/main/java/org/apache/jena/update/UpdateExecutionBuilder.java
+++ b/jena-arq/src/main/java/org/apache/jena/update/UpdateExecutionBuilder.java
@@ -18,6 +18,8 @@
package org.apache.jena.update;
+import java.util.concurrent.TimeUnit;
+
import org.apache.jena.query.QuerySolution;
import org.apache.jena.rdf.model.RDFNode;
import org.apache.jena.sparql.util.Context;
@@ -47,6 +49,10 @@ public interface UpdateExecutionBuilder {
public UpdateExecutionBuilder substitution(String varName, RDFNode value);
+ public UpdateExecutionBuilder timeout(long value, TimeUnit timeUnit);
+
+ public default UpdateExecutionBuilder timeout(long value) { return
timeout(value, TimeUnit.MILLISECONDS); }
+
public UpdateExecution build();
/** Build and execute */
diff --git
a/jena-arq/src/main/java/org/apache/jena/update/UpdateExecutionDatasetBuilder.java
b/jena-arq/src/main/java/org/apache/jena/update/UpdateExecutionDatasetBuilder.java
index 63efc743f4..b0ce8acac2 100644
---
a/jena-arq/src/main/java/org/apache/jena/update/UpdateExecutionDatasetBuilder.java
+++
b/jena-arq/src/main/java/org/apache/jena/update/UpdateExecutionDatasetBuilder.java
@@ -18,6 +18,8 @@
package org.apache.jena.update;
+import java.util.concurrent.TimeUnit;
+
import org.apache.jena.graph.Node;
import org.apache.jena.query.Dataset;
import org.apache.jena.query.QueryExecution;
@@ -131,6 +133,12 @@ public class UpdateExecutionDatasetBuilder implements
UpdateExecutionBuilder {
return this;
}
+ @Override
+ public UpdateExecutionBuilder timeout(long value, TimeUnit timeUnit) {
+ builder.timeout(value, timeUnit);
+ return this;
+ }
+
@Override
public UpdateExecution build() {
UpdateExec exec = builder.build();
diff --git a/jena-arq/src/main/java/org/apache/jena/update/UpdateProcessor.java
b/jena-arq/src/main/java/org/apache/jena/update/UpdateProcessor.java
index 71ffb5a63c..49ede54058 100644
--- a/jena-arq/src/main/java/org/apache/jena/update/UpdateProcessor.java
+++ b/jena-arq/src/main/java/org/apache/jena/update/UpdateProcessor.java
@@ -18,6 +18,8 @@
package org.apache.jena.update;
+import org.apache.jena.sparql.util.Context;
+
/**
* An instance of a execution of an UpdateRequest.
* Applies to UpdateExec (GPI) and UpdateExecution (API).
@@ -26,4 +28,10 @@ public interface UpdateProcessor
{
/** Execute */
public void execute() ;
+
+ /** Attempt to asynchronously abort an update execution. */
+ public default void abort() { }
+
+ /** Returns the processor's context. Null if there is none. */
+ public default Context getContext() { return null ; }
}
diff --git
a/jena-arq/src/test/java/org/apache/jena/sparql/api/TestQueryExecutionCancel.java
b/jena-arq/src/test/java/org/apache/jena/sparql/api/TestQueryExecutionCancel.java
index 8426349e70..7ef22a3a49 100644
---
a/jena-arq/src/test/java/org/apache/jena/sparql/api/TestQueryExecutionCancel.java
+++
b/jena-arq/src/test/java/org/apache/jena/sparql/api/TestQueryExecutionCancel.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.IntStream;
@@ -42,13 +43,21 @@ import org.apache.jena.rdf.model.Model ;
import org.apache.jena.rdf.model.ModelFactory;
import org.apache.jena.rdf.model.Property ;
import org.apache.jena.rdf.model.Resource ;
+import org.apache.jena.sparql.ARQConstants;
import org.apache.jena.sparql.core.DatasetGraph;
import org.apache.jena.sparql.core.DatasetGraphFactory;
+import org.apache.jena.sparql.engine.ExecutionContext;
import org.apache.jena.sparql.exec.QueryExec;
+import org.apache.jena.sparql.exec.QueryExecBuilder;
+import org.apache.jena.sparql.expr.NodeValue;
+import org.apache.jena.sparql.function.FunctionBase0;
+import org.apache.jena.sparql.function.FunctionEnv;
import org.apache.jena.sparql.function.FunctionRegistry ;
import org.apache.jena.sparql.function.library.wait ;
import org.apache.jena.sparql.graph.GraphFactory ;
import org.apache.jena.sparql.sse.SSE;
+import org.apache.jena.sparql.util.Context;
+import org.apache.jena.sparql.util.Symbol;
import org.junit.AfterClass ;
import org.junit.Assert;
import org.junit.BeforeClass ;
@@ -228,6 +237,73 @@ public class TestQueryExecutionCancel {
cancellationTest("JSON {\":a\": \"b\"} WHERE {}",
exec->exec.execJson().get(0));
}
+ /** Set cancel signal function via {@link QueryExecBuilder#set(Symbol,
Object)}. */
+ @Test
+ public void test_cancel_signal_1() {
+ DatasetGraph dsg = DatasetGraphFactory.create();
+ FunctionRegistry fnReg = registerCancelSignalFunction(new
FunctionRegistry());
+ try (QueryExec qe = QueryExec.dataset(dsg).query("SELECT
(<urn:cancelSignal>() AS ?foobar) { }")
+ .set(ARQConstants.registryFunctions, fnReg)
+ .build()) {
+ Assert.assertEquals(1,
ResultSetFormatter.consume(ResultSet.adapt(qe.select())));
+ }
+ }
+
+ /** Set cancel signal function via {@link
QueryExecBuilder#context(Context)}. */
+ @Test
+ public void test_cancel_signal_2() {
+ DatasetGraph dsg = DatasetGraphFactory.create();
+ Context cxt = ARQ.getContext().copy();
+ FunctionRegistry fnReg = registerCancelSignalFunction(new
FunctionRegistry());
+ FunctionRegistry.set(cxt, fnReg);
+ try (QueryExec qe = QueryExec.dataset(dsg).query("SELECT
(<urn:cancelSignal>() AS ?foobar) { }").context(cxt).build()) {
+ Assert.assertEquals(1,
ResultSetFormatter.consume(ResultSet.adapt(qe.select())));
+ }
+ }
+
+ /** Set cancel signal function via {@link QueryExec#getContext()}. */
+ @Test
+ public void test_cancel_signal_3() {
+ DatasetGraph dsg = DatasetGraphFactory.create();
+ try (QueryExec qe = QueryExec.dataset(dsg).query("SELECT
(<urn:cancelSignal>() AS ?foobar) { }").build()) {
+ FunctionRegistry fnReg = registerCancelSignalFunction(new
FunctionRegistry());
+ FunctionRegistry.set(qe.getContext(), fnReg);
+ ResultSetFormatter.consume(ResultSet.adapt(qe.select()));
+ }
+ }
+
+ /** Registers the function <urn:cancelSignal> which returns its value if
present.
+ * A RuntimeException is raised if there is no cancel signal in the
execution context. */
+ static FunctionRegistry registerCancelSignalFunction(FunctionRegistry
fnReg) {
+ fnReg.put("urn:cancelSignal", iri -> new FunctionBase0() {
+ @Override
+ protected NodeValue exec(List<NodeValue> args, FunctionEnv env) {
+ ExecutionContext execCxt = (ExecutionContext)env;
+ AtomicBoolean cancelSignal = execCxt.getCancelSignal();
+ if (cancelSignal == null) {
+ throw new RuntimeException("No cancel signal in execution
context.");
+ }
+ return NodeValue.makeBoolean(cancelSignal.get());
+ }
+
+ @Override
+ public NodeValue exec() {
+ throw new IllegalStateException("Should never be called");
+ }
+ });
+
+ return fnReg;
+ }
+
+ /** Create a model with 1000 triples. */
+ static Graph createTestGraph() {
+ Graph graph = GraphFactory.createDefaultGraph();
+ IntStream.range(0, 1000)
+ .mapToObj(i -> NodeFactory.createURI("http://www.example.org/r" +
i))
+ .forEach(node -> graph.add(node, node, node));
+ return graph;
+ }
+
static <T> void cancellationTest(String queryString, Function<QueryExec,
Iterator<T>> itFactory, Consumer<Iterator<T>> itConsumer) {
cancellationTest(queryString, itFactory::apply);
cancellationTestForIterator(queryString, itFactory, itConsumer);
@@ -244,7 +320,7 @@ public class TestQueryExecutionCancel {
}
/** Obtain an iterator and only afterwards abort the query exec.
- * Operations on the iterator are now expected to fail. */
+ * Operations on the iterator are now expected to fail. */
static <T> void cancellationTestForIterator(String queryString,
Function<QueryExec, Iterator<T>> itFactory, Consumer<Iterator<T>> itConsumer) {
DatasetGraph dsg = DatasetGraphFactory.createTxnMem();
dsg.add(SSE.parseQuad("(_ :s :p :o)"));
@@ -255,10 +331,8 @@ public class TestQueryExecutionCancel {
}
}
- /**
- * Test that creates iterators over a billion result rows and attempts to
cancel them.
- * If this test hangs then it is likely that something went wrong in the
cancellation machinery.
- */
+ /** Test that creates iterators over a billion result rows and attempts to
cancel them.
+ * If this test hangs then it is likely that something went wrong in the
cancellation machinery. */
@Test(timeout = 10000)
public void test_cancel_concurrent_1() {
int maxCancelDelayInMillis = 100;
@@ -268,11 +342,7 @@ public class TestQueryExecutionCancel {
int taskCount = cpuCount * 10;
// Create a model with 1000 triples
- Graph graph = GraphFactory.createDefaultGraph();
- IntStream.range(0, 1000)
- .mapToObj(i -> NodeFactory.createURI("http://www.example.org/r" +
i))
- .forEach(node -> graph.add(node, node, node));
- Model model = ModelFactory.createModelForGraph(graph);
+ Model model = ModelFactory.createModelForGraph(createTestGraph());
// Create a query that creates 3 cross joins - resulting in one
billion result rows
Query query = QueryFactory.create("SELECT * { ?a ?b ?c . ?d ?e ?f . ?g
?h ?i . }");
@@ -289,10 +359,8 @@ public class TestQueryExecutionCancel {
}
}
- /**
- * Reusable method that creates a parallel stream that starts query
executions
- * and schedules cancel tasks on a separate thread pool.
- */
+ /** Reusable method that creates a parallel stream that starts query
executions
+ * and schedules cancel tasks on a separate thread pool. */
public static void runConcurrentAbort(int taskCount, int maxCancelDelay,
Callable<QueryExecution> qeFactory, Function<QueryExecution, ?> processor) {
Random cancelDelayRandom = new Random();
ExecutorService executorService = Executors.newCachedThreadPool();
diff --git
a/jena-arq/src/test/java/org/apache/jena/sparql/api/TestTimeouts.java
b/jena-arq/src/test/java/org/apache/jena/sparql/api/TestTimeouts.java
new file mode 100644
index 0000000000..dcc65d61e3
--- /dev/null
+++ b/jena-arq/src/test/java/org/apache/jena/sparql/api/TestTimeouts.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.sparql.api;
+
+import org.apache.jena.sparql.engine.Timeouts;
+import org.apache.jena.sparql.engine.Timeouts.Timeout;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestTimeouts {
+ @Test
+ public void testUnset() {
+ Timeout timeout = roundtrip(Timeout.UNSET);
+ String str = Timeouts.toString(timeout);
+ Assert.assertNull(str);
+ }
+
+ @Test
+ public void testInitialTimeout() {
+ Timeout timeout = roundtrip(new Timeout(6, -1));
+ String str = Timeouts.toString(timeout);
+ Assert.assertEquals("6,-1", str);
+ }
+
+ @Test
+ public void testOverallTimeout() {
+ Timeout timeout = roundtrip(new Timeout(-1, 6));
+ String str = Timeouts.toString(timeout);
+ Assert.assertEquals("6", str);
+ }
+
+ @Test
+ public void testInitialAndOverallTimeout() {
+ Timeout timeout = roundtrip(new Timeout(6, 6));
+ String str = Timeouts.toString(timeout);
+ Assert.assertEquals("6,6", str);
+ }
+
+ public static Timeout roundtrip(Timeout timeout) {
+ Object obj = Timeouts.toContextValue(timeout);
+ Timeout result = Timeouts.parseTimeout(obj);
+ Assert.assertEquals(timeout, result);
+ return result;
+ }
+}
diff --git
a/jena-arq/src/test/java/org/apache/jena/sparql/api/TestUpdateExecutionCancel.java
b/jena-arq/src/test/java/org/apache/jena/sparql/api/TestUpdateExecutionCancel.java
new file mode 100644
index 0000000000..386e45f7e2
--- /dev/null
+++
b/jena-arq/src/test/java/org/apache/jena/sparql/api/TestUpdateExecutionCancel.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.sparql.api;
+
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.jena.atlas.iterator.Iter;
+import org.apache.jena.graph.Graph;
+import org.apache.jena.query.ARQ;
+import org.apache.jena.query.Dataset;
+import org.apache.jena.query.DatasetFactory;
+import org.apache.jena.query.QueryCancelledException;
+import org.apache.jena.sparql.ARQConstants;
+import org.apache.jena.sparql.core.DatasetGraph;
+import org.apache.jena.sparql.core.DatasetGraphFactory;
+import org.apache.jena.sparql.exec.UpdateExec;
+import org.apache.jena.sparql.exec.UpdateExecBuilder;
+import org.apache.jena.sparql.function.FunctionRegistry;
+import org.apache.jena.sparql.util.Context;
+import org.apache.jena.sparql.util.Symbol;
+import org.apache.jena.update.UpdateExecutionFactory;
+import org.apache.jena.update.UpdateFactory;
+import org.apache.jena.update.UpdateProcessor;
+import org.apache.jena.update.UpdateRequest;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestUpdateExecutionCancel {
+
+ /** Set cancel signal function via {@link UpdateExecBuilder#set(Symbol,
Object)}. */
+ @Test
+ public void test_cancel_signal_1() {
+ DatasetGraph dsg = DatasetGraphFactory.create();
+ FunctionRegistry fnReg =
TestQueryExecutionCancel.registerCancelSignalFunction(new FunctionRegistry());
+ UpdateExec ue = UpdateExec.dataset(dsg)
+ .update("INSERT { <s> <p> ?o } WHERE {
BIND(<urn:cancelSignal>() AS ?o) }")
+ .set(ARQConstants.registryFunctions, fnReg)
+ .build();
+ ue.execute();
+ Assert.assertEquals(1, Iter.count(dsg.find()));
+ }
+
+ /** Set cancel signal function via {@link
UpdateExecBuilder#context(Context)}. */
+ @Test
+ public void test_cancel_signal_2() {
+ DatasetGraph dsg = DatasetGraphFactory.create();
+ Context cxt = ARQ.getContext().copy();
+ FunctionRegistry fnReg =
TestQueryExecutionCancel.registerCancelSignalFunction(new FunctionRegistry());
+ FunctionRegistry.set(cxt, fnReg);
+ UpdateExec ue = UpdateExec.dataset(dsg)
+ .update("INSERT { <s> <p> ?o } WHERE {
BIND(<urn:cancelSignal>() AS ?o) }")
+ .context(cxt)
+ .build();
+ ue.execute();
+ Assert.assertEquals(1, Iter.count(dsg.find()));
+ }
+
+ /** Set cancel signal function via {@link UpdateExec#getContext()}. */
+ @Test
+ public void test_cancel_signal_3() {
+ DatasetGraph dsg = DatasetGraphFactory.create();
+ UpdateExec ue = UpdateExec.dataset(dsg)
+ .update("INSERT { <s> <p> ?o } WHERE {
BIND(<urn:cancelSignal>() AS ?o) }")
+ .build();
+ Context cxt = ue.getContext();
+ FunctionRegistry fnReg =
TestQueryExecutionCancel.registerCancelSignalFunction(new FunctionRegistry());
+ FunctionRegistry.set(cxt, fnReg);
+ ue.execute();
+ Assert.assertEquals(1, Iter.count(dsg.find()));
+ }
+
+ @Test(expected = QueryCancelledException.class, timeout = 5000)
+ public void test_update_cancel_1() {
+ Graph graph = TestQueryExecutionCancel.createTestGraph();
+ // Create an insert whose WHERE clause creates 3 cross joins a 1000
triples/bindings.
+ // This would result in one billion result rows.
+ UpdateExec
+ .dataset(graph)
+ // No-op delete followed by insert indirectly tests that timeout
is applied to overall update request.
+ .update("DELETE { <s> <p> <o> } WHERE { ?a ?b ?c } ; INSERT { <s>
<p> <o> } WHERE { ?a ?b ?c . ?d ?e ?f . ?g ?h ?i . }")
+ .timeout(50, TimeUnit.MILLISECONDS)
+ .build()
+ .execute();
+ }
+
+ /** Test that creates iterators over a billion result rows and attempts to
cancel them.
+ * If this test hangs then it is likely that something went wrong in the
cancellation machinery. */
+ @Test(timeout = 10000)
+ public void test_cancel_concurrent_1() {
+ int maxCancelDelayInMillis = 100;
+
+ int cpuCount = Runtime.getRuntime().availableProcessors();
+ // Spend at most roughly 1 second per cpu (10 tasks a max 100ms)
+ int taskCount = cpuCount * 10;
+
+ // Create a model with 1000 triples
+ Dataset dataset =
DatasetFactory.wrap(DatasetGraphFactory.wrap(TestQueryExecutionCancel.createTestGraph()));
+
+ // Create a query that creates 3 cross joins - resulting in one
billion result rows
+ UpdateRequest updateRequest = UpdateFactory.create("INSERT { <s> <p>
<o> } WHERE { ?a ?b ?c . ?d ?e ?f . ?g ?h ?i . }");
+ Callable<UpdateProcessor> qeFactory = () ->
UpdateExecutionFactory.create(updateRequest, dataset);
+
+ runConcurrentAbort(taskCount, maxCancelDelayInMillis, qeFactory);
+ }
+
+ /** Reusable method that creates a parallel stream that starts query
executions
+ * and schedules cancel tasks on a separate thread pool. */
+ public static void runConcurrentAbort(int taskCount, int maxCancelDelay,
Callable<UpdateProcessor> upFactory) {
+ Random cancelDelayRandom = new Random();
+ ExecutorService executorService = Executors.newCachedThreadPool();
+ try {
+ List<Integer> list = IntStream.range(0,
taskCount).boxed().collect(Collectors.toList());
+ list
+ .parallelStream()
+ .forEach(i -> {
+ UpdateProcessor up;
+ try {
+ up = upFactory.call();
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to build a query
execution", e);
+ }
+ Future<?> future = executorService.submit(() ->
up.execute());
+ int delayToAbort =
cancelDelayRandom.nextInt(maxCancelDelay);
+ try {
+ Thread.sleep(delayToAbort);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ // System.out.println("Abort: " + qe);
+ up.abort();
+ try {
+ // System.out.println("Waiting for: " + qe);
+ future.get();
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ if (!(cause instanceof QueryCancelledException)) {
+ // Unexpected exception - print out the stack trace
+ e.printStackTrace();
+ }
+ Assert.assertEquals(QueryCancelledException.class,
cause.getClass());
+ } catch (InterruptedException e) {
+ // Ignored
+ } finally {
+ // System.out.println("Completed: " + qe);
+ }
+ });
+ } finally {
+ executorService.shutdownNow();
+ }
+ }
+}
diff --git
a/jena-fuseki2/jena-fuseki-main/src/test/java/org/apache/jena/fuseki/main/TestSPARQLProtocolTimeout.java
b/jena-fuseki2/jena-fuseki-main/src/test/java/org/apache/jena/fuseki/main/TestSPARQLProtocolTimeout.java
new file mode 100644
index 0000000000..ee80f16159
--- /dev/null
+++
b/jena-fuseki2/jena-fuseki-main/src/test/java/org/apache/jena/fuseki/main/TestSPARQLProtocolTimeout.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.fuseki.main;
+
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+
+import org.apache.jena.atlas.web.HttpException;
+import org.apache.jena.graph.Graph;
+import org.apache.jena.graph.NodeFactory;
+import org.apache.jena.sparql.exec.http.GSP;
+import org.apache.jena.sparql.graph.GraphFactory;
+import org.apache.jena.sparql.util.Convert;
+import org.apache.jena.update.UpdateExecution;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestSPARQLProtocolTimeout extends AbstractFusekiTest
+{
+ @Before
+ public void before() {
+ Graph graph = createTestGraph();
+ GSP.service(serviceGSP()).defaultGraph().PUT(graph);
+ }
+
+ /** Create a model with 1000 triples. Same method as in {@link
org.apache.jena.sparql.api.TestQueryExecutionCancel}. */
+ static Graph createTestGraph() {
+ Graph graph = GraphFactory.createDefaultGraph();
+ IntStream.range(0, 1000)
+ .mapToObj(i -> NodeFactory.createURI("http://www.example.org/r" +
i))
+ .forEach(node -> graph.add(node, node, node));
+ return graph;
+ }
+
+ static String query(String base, String queryString) {
+ return base + "?query=" + Convert.encWWWForm(queryString);
+ }
+
+ /** If the HTTP client reaches its timeout and disconnects from the server
then it is up
+ * to the server whether it will cancel or complete the started SPARQL
update execution. */
+ @Test(expected = HttpException.class)
+ public void update_timeout_01() {
+ UpdateExecution.service(serviceUpdate())
+ .update("INSERT { } WHERE { ?a ?b ?c . ?d ?e ?f . ?g ?h ?i . }")
+ .timeout(500, TimeUnit.MILLISECONDS)
+ .execute();
+ }
+}
diff --git
a/jena-integration-tests/src/test/java/org/apache/jena/test/rdfconnection/TestRDFConnectionRemote.java
b/jena-integration-tests/src/test/java/org/apache/jena/test/rdfconnection/TestRDFConnectionRemote.java
index 20dd3d2bd6..75f19f3836 100644
---
a/jena-integration-tests/src/test/java/org/apache/jena/test/rdfconnection/TestRDFConnectionRemote.java
+++
b/jena-integration-tests/src/test/java/org/apache/jena/test/rdfconnection/TestRDFConnectionRemote.java
@@ -28,6 +28,8 @@ import org.apache.jena.query.QueryExecution;
import org.apache.jena.query.QueryParseException;
import org.apache.jena.query.ResultSet;
import org.apache.jena.rdf.model.Model;
+import org.apache.jena.rdf.model.RDFNode;
+import org.apache.jena.rdf.model.ResourceFactory;
import org.apache.jena.rdfconnection.AbstractTestRDFConnection;
import org.apache.jena.rdfconnection.RDFConnection;
import org.apache.jena.rdfconnection.RDFConnectionRemote;
@@ -228,11 +230,27 @@ public class TestRDFConnectionRemote extends
AbstractTestRDFConnection {
/** Standard update syntax with substitution should work. */
@Test
public void standard_syntax_update_remote_1a() {
+ RDFNode FALSE = ResourceFactory.createTypedLiteral(false);
try ( RDFConnection conn =
RDFConnectionRemote.service(server.datasetURL("/ds")).parseCheckSPARQL(false).build()
) {
conn.newUpdate()
- .update("INSERT DATA { <a> <b> <c> }")
- .update("INSERT DATA { <x> <y> <z> }")
- .substitution("foo", RDF.type).build();
+ .update("INSERT { <a> <b> <c> } WHERE { FILTER(?foo) }")
+ .update("INSERT { <x> <y> <z> } WHERE { FILTER(?foo) }")
+ .substitution("foo", FALSE)
+ .build()
+ .execute();
+ }
+ }
+
+ /** Standard update syntax with substitution should work when comments are
involved. */
+ @Test
+ public void standard_syntax_update_remote_2b() {
+ try ( RDFConnection conn =
RDFConnectionRemote.service(server.datasetURL("/ds")).parseCheckSPARQL(false).build()
) {
+ conn.newUpdate()
+ .update("# <update1>\n INSERT { <a> <urn:b> <c> } WHERE {
FILTER(false) } # </update1>")
+ .update("# <update2>\n INSERT { <d> <urn:e> <f> } WHERE {
FILTER(false) } # </update2>")
+ .update("# <update3>\n INSERT { <g> <urn:h> <i> } WHERE {
FILTER(false) } # </update3>")
+ .build()
+ .execute();
}
}