This is an automated email from the ASF dual-hosted git repository.
andy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/jena.git
The following commit(s) were added to refs/heads/main by this push:
new f90380630f GH-3471: Async HTTP
f90380630f is described below
commit f90380630f11965276816edecc1cbbdf7c6dfa72
Author: Claus Stadler <[email protected]>
AuthorDate: Wed Oct 1 17:24:34 2025 +0200
GH-3471: Async HTTP
---
.../org/apache/jena/atlas/web/HttpException.java | 11 +-
.../java/org/apache/jena/http/AsyncHttpRDF.java | 44 +++++--
.../main/java/org/apache/jena/http/HttpLib.java | 107 +++++++++-------
.../java/org/apache/jena/http/auth/AuthLib.java | 43 +++++--
.../jena/sparql/exec/http/QueryExecHTTP.java | 142 ++++++++++++++++-----
.../org/apache/jena/sparql/exec/http/Service.java | 6 +-
.../jena/sparql/exec/http/UpdateExecHTTP.java | 16 ++-
.../org/apache/jena/update/UpdateProcessor.java | 13 ++
8 files changed, 273 insertions(+), 109 deletions(-)
diff --git
a/jena-arq/src/main/java/org/apache/jena/atlas/web/HttpException.java
b/jena-arq/src/main/java/org/apache/jena/atlas/web/HttpException.java
index 5597e80947..23edb47be6 100644
--- a/jena-arq/src/main/java/org/apache/jena/atlas/web/HttpException.java
+++ b/jena-arq/src/main/java/org/apache/jena/atlas/web/HttpException.java
@@ -34,14 +34,15 @@ public class HttpException extends RuntimeException {
}
public HttpException(int statusCode, String statusLine) {
- super(exMessage(statusCode, statusLine));
- this.statusCode = statusCode;
- this.statusLine = statusLine ;
- this.response = null;
+ this(statusCode, statusLine, null, null);
}
public HttpException(int statusCode, String statusLine, String
responseMessage) {
- super(exMessage(statusCode, statusLine));
+ this(statusCode, statusLine, responseMessage, null);
+ }
+
+ public HttpException(int statusCode, String statusLine, String
responseMessage, Throwable cause) {
+ super(exMessage(statusCode, statusLine), cause);
this.statusCode = statusCode;
this.statusLine = statusLine ;
this.response = responseMessage;
diff --git a/jena-arq/src/main/java/org/apache/jena/http/AsyncHttpRDF.java
b/jena-arq/src/main/java/org/apache/jena/http/AsyncHttpRDF.java
index 5eeee4429b..b84df0075d 100644
--- a/jena-arq/src/main/java/org/apache/jena/http/AsyncHttpRDF.java
+++ b/jena-arq/src/main/java/org/apache/jena/http/AsyncHttpRDF.java
@@ -160,26 +160,50 @@ public class AsyncHttpRDF {
* This operation extracts RuntimeException from the {@code
CompletableFuture}.
*/
public static <T> T getOrElseThrow(CompletableFuture<T> cf) {
+ return getOrElseThrow(cf, null);
+ }
+
+ /**
+ * Get the value of a {@link CompletableFuture} that executes of an HTTP
request.
+ * In case on any error, an {@link HttpException} is thrown.
+ *
+ * @param <T> The type of the value being computed.
+ * @param cf The completable future.
+ * @param httpRequest An optional HttpRequest for improving feedback in
case of exceptions.
+ * @return The value computed by the completable future.
+ */
+ public static <T> T getOrElseThrow(CompletableFuture<T> cf, HttpRequest
httpRequest) {
Objects.requireNonNull(cf);
try {
return cf.join();
//} catch (CancellationException ex1) { // Let this pass out.
} catch (CompletionException ex) {
- if ( ex.getCause() != null ) {
- Throwable cause = ex.getCause();
- if ( cause instanceof RuntimeException )
- throw (RuntimeException)cause;
+ Throwable cause = ex.getCause();
+ if ( cause != null ) {
+
+ // Pass on our own HttpException instances such as 401
Unauthorized.
+ if ( cause instanceof HttpException httpEx ) {
+ throw new HttpException(httpEx.getStatusCode(),
httpEx.getStatusLine(), httpEx.getResponse(), cause);
+ }
+
+ final String msg = cause.getMessage();
+
if ( cause instanceof IOException ) {
- IOException iox = (IOException)cause;
// Rather than an HTTP exception, bad authentication
becomes IOException("too many authentication attempts");
- if ( iox.getMessage().contains("too many authentication
attempts") ||
- iox.getMessage().contains("No credentials
provided") ) {
- throw new HttpException(401, HttpSC.getMessage(401));
+ if ( msg != null &&
+ ( msg.contains("too many authentication attempts")
||
+ msg.contains("No credentials provided") ) ) {
+ throw new HttpException(401, HttpSC.getMessage(401),
null, cause);
+ }
+ if (httpRequest != null) {
+ throw new HttpException(httpRequest.method()+"
"+httpRequest.uri().toString(), cause);
}
- IO.exception((IOException)cause);
}
+
+ throw new HttpException(msg, cause);
}
- throw ex;
+ // Note: CompletionException without cause should never happen.
+ throw new HttpException(ex);
}
}
diff --git a/jena-arq/src/main/java/org/apache/jena/http/HttpLib.java
b/jena-arq/src/main/java/org/apache/jena/http/HttpLib.java
index 1895105624..56205d9619 100644
--- a/jena-arq/src/main/java/org/apache/jena/http/HttpLib.java
+++ b/jena-arq/src/main/java/org/apache/jena/http/HttpLib.java
@@ -37,6 +37,7 @@ import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -560,7 +561,7 @@ public class HttpLib {
* @return HttpResponse
*/
public static HttpResponse<InputStream> execute(HttpClient httpClient,
HttpRequest httpRequest) {
- return execute(httpClient, httpRequest, BodyHandlers.ofInputStream());
+ return AsyncHttpRDF.getOrElseThrow(executeAsync(httpClient,
httpRequest, BodyHandlers.ofInputStream()), httpRequest);
}
/**
@@ -580,13 +581,35 @@ public class HttpLib {
*
* @param httpClient
* @param httpRequest
- * @param bodyHandler
* @return HttpResponse
*/
- /*package*/ static <X> HttpResponse<X> execute(HttpClient httpClient,
HttpRequest httpRequest, BodyHandler<X> bodyHandler) {
+ public static CompletableFuture<HttpResponse<InputStream>>
executeAsync(HttpClient httpClient, HttpRequest httpRequest) {
+ return executeAsync(httpClient, httpRequest,
BodyHandlers.ofInputStream());
+ }
+
+ /**
+ * Execute a request, return a {@code HttpResponse<X>} which
+ * can be passed to {@link #handleHttpStatusCode(HttpResponse)} which will
+ * convert non-2xx status code to {@link HttpException HttpExceptions}.
+ * <p>
+ * This function applies the HTTP authentication challenge support
+ * and will repeat the request if necessary with added authentication.
+ * <p>
+ * See {@link AuthEnv} for authentication registration.
+ * <br/>
+ * See {@link #executeJDK} to execute exactly once without challenge
response handling.
+ *
+ * @see AuthEnv AuthEnv for authentic registration
+ * @see #executeJDK executeJDK to execute exacly once.
+ *
+ * @param httpClient
+ * @param httpRequest
+ * @param bodyHandler
+ * @return HttpResponse
+ */ /*package*/ static <X> CompletableFuture<HttpResponse<X>>
executeAsync(HttpClient httpClient, HttpRequest httpRequest, BodyHandler<X>
bodyHandler) {
// To run with no jena-supplied authentication handling.
if ( false )
- return executeJDK(httpClient, httpRequest, bodyHandler);
+ return executeJDKAsync(httpClient, httpRequest, bodyHandler);
URI uri = httpRequest.uri();
URI key = null;
@@ -602,29 +625,16 @@ public class HttpLib {
authEnv.registerUsernamePassword(key, userpasswd[0],
userpasswd[1]);
}
}
- try {
- return AuthLib.authExecute(httpClient, httpRequest, bodyHandler);
- } finally {
- if ( key != null )
- // The AuthEnv is "per tenant".
- // Temporary registration within the AuthEnv of the
- // user:password is acceptable.
- authEnv.unregisterUsernamePassword(key);
- }
- }
- /**
- * Execute request and return a {@code HttpResponse<InputStream>} response.
- * Status codes have not been handled. The response can be passed to
- * {@link #handleResponseInputStream(HttpResponse)} which will convert
non-2xx
- * status code to {@link HttpException HttpExceptions}.
- *
- * @param httpClient
- * @param httpRequest
- * @return HttpResponse
- */
- public static HttpResponse<InputStream> executeJDK(HttpClient httpClient,
HttpRequest httpRequest) {
- return execute(httpClient, httpRequest, BodyHandlers.ofInputStream());
+ URI finalKey = key;
+ return AuthLib.authExecuteAsync(httpClient, httpRequest, bodyHandler)
+ .whenComplete((httpResponse, throwable) -> {
+ if ( finalKey != null )
+ // The AuthEnv is "per tenant".
+ // Temporary registration within the AuthEnv of the
+ // user:password is acceptable.
+ authEnv.unregisterUsernamePassword(finalKey);
+ });
}
/**
@@ -640,25 +650,32 @@ public class HttpLib {
* @return HttpResponse
*/
public static <T> HttpResponse<T> executeJDK(HttpClient httpClient,
HttpRequest httpRequest, BodyHandler<T> bodyHandler) {
- try {
- // This is the one place all HTTP requests go through.
- logRequest(httpRequest);
- HttpResponse<T> httpResponse = httpClient.send(httpRequest,
bodyHandler);
- logResponse(httpResponse);
- return httpResponse;
- //} catch (HttpTimeoutException ex) {
- } catch (IOException | InterruptedException ex) {
- if ( ex.getMessage() != null ) {
- // This is silly.
- // Rather than an HTTP exception, bad authentication becomes
IOException("too many authentication attempts");
- // or IOException("No credentials provided") if the
authenticator decides to return null.
- if ( ex.getMessage().contains("too many authentication
attempts") ||
- ex.getMessage().contains("No credentials provided") ) {
- throw new HttpException(401, HttpSC.getMessage(401));
- }
- }
- throw new HttpException(httpRequest.method()+"
"+httpRequest.uri().toString(), ex);
- }
+ return AsyncHttpRDF.getOrElseThrow(executeJDKAsync(httpClient,
httpRequest, bodyHandler), httpRequest);
+ }
+
+ /**
+ * Execute request and return a {@code HttpResponse<InputStream>} response.
+ * Status codes have not been handled. The response can be passed to
+ * {@link #handleResponseInputStream(HttpResponse)} which will convert
non-2xx
+ * status code to {@link HttpException HttpExceptions}.
+ *
+ * @param httpClient
+ * @param httpRequest
+ * @return HttpResponse
+ */
+ public static CompletableFuture<HttpResponse<InputStream>>
executeJDKAsync(HttpClient httpClient, HttpRequest httpRequest) {
+ return executeAsync(httpClient, httpRequest,
BodyHandlers.ofInputStream());
+ }
+
+ public static <T> CompletableFuture<HttpResponse<T>>
executeJDKAsync(HttpClient httpClient, HttpRequest httpRequest, BodyHandler<T>
bodyHandler) {
+ // This is the one place all HTTP requests go through.
+ logRequest(httpRequest);
+ CompletableFuture<HttpResponse<T>> future =
httpClient.sendAsync(httpRequest, bodyHandler)
+ .thenApply(httpResponse -> {
+ logResponse(httpResponse);
+ return httpResponse;
+ });
+ return future;
}
/*package*/ static CompletableFuture<HttpResponse<InputStream>>
asyncExecute(HttpClient httpClient, HttpRequest httpRequest) {
diff --git a/jena-arq/src/main/java/org/apache/jena/http/auth/AuthLib.java
b/jena-arq/src/main/java/org/apache/jena/http/auth/AuthLib.java
index 504d36fc5c..c00c50a2c5 100644
--- a/jena-arq/src/main/java/org/apache/jena/http/auth/AuthLib.java
+++ b/jena-arq/src/main/java/org/apache/jena/http/auth/AuthLib.java
@@ -29,17 +29,19 @@ import java.net.http.HttpResponse.BodyHandler;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import org.apache.jena.atlas.lib.Bytes;
import org.apache.jena.atlas.web.AuthScheme;
import org.apache.jena.atlas.web.HttpException;
+import org.apache.jena.http.AsyncHttpRDF;
import org.apache.jena.http.HttpLib;
import org.apache.jena.riot.web.HttpNames;
import org.apache.jena.web.HttpSC;
public class AuthLib {
/**
- * Call {@link HttpClient#send} after applying an active {@link
AuthRequestModifier}
+ * Call the {@link HttpClient} after applying an active {@link
AuthRequestModifier}
* to modify the {@link java.net.http.HttpRequest.Builder}.
* If no {@link AuthRequestModifier} is available and if a 401 response is
received,
* setup a {@link AuthRequestModifier} passed on registered username and
password information.
@@ -51,24 +53,41 @@ public class AuthLib {
* @return HttpResponse<T>
*/
public static <T> HttpResponse<T> authExecute(HttpClient httpClient,
HttpRequest httpRequest, BodyHandler<T> bodyHandler) {
- HttpResponse<T> httpResponse = HttpLib.executeJDK(httpClient,
httpRequest, bodyHandler);
+ return AsyncHttpRDF.getOrElseThrow(authExecuteAsync(httpClient,
httpRequest, bodyHandler), httpRequest);
+ }
- // -- 401 handling.
- if ( httpResponse.statusCode() != 401 )
- return httpResponse;
- HttpResponse<T> httpResponse2 = handle401(httpClient, httpRequest,
bodyHandler, httpResponse);
- return httpResponse2;
+ /**
+ * Call {@link HttpClient#sendAsync} after applying an active {@link
AuthRequestModifier}
+ * to modify the {@link java.net.http.HttpRequest.Builder}.
+ * If no {@link AuthRequestModifier} is available and if a 401 response is
received,
+ * setup a {@link AuthRequestModifier} passed on registered username and
password information.
+ * This function supports basic and digest authentication.
+ *
+ * @param httpClient HttpClient
+ * @param httpRequest
+ * @param bodyHandler
+ * @return CompletableFuture<HttpResponse<T>>
+ */
+ public static <T> CompletableFuture<HttpResponse<T>>
authExecuteAsync(HttpClient httpClient, HttpRequest httpRequest, BodyHandler<T>
bodyHandler) {
+ return HttpLib.executeJDKAsync(httpClient, httpRequest, bodyHandler)
+ .thenCompose(httpResponse -> {
+ // -- 401 handling.
+ if ( httpResponse.statusCode() != 401 )
+ return CompletableFuture.completedFuture(httpResponse);
+ CompletableFuture<HttpResponse<T>> httpResponse2 =
handle401Async(httpClient, httpRequest, bodyHandler, httpResponse);
+ return httpResponse2;
+ });
}
/* Handle a 401 (authentication challenge). */
- private static <T> HttpResponse<T> handle401(HttpClient httpClient,
+ private static <T> CompletableFuture<HttpResponse<T>>
handle401Async(HttpClient httpClient,
HttpRequest request,
BodyHandler<T> bodyHandler,
HttpResponse<T>
httpResponse401) {
AuthChallenge aHeader = wwwAuthenticateHeader(httpResponse401);
if ( aHeader == null )
// No valid header - simply return the original response.
- return httpResponse401;
+ return CompletableFuture.completedFuture(httpResponse401);
// Currently on a URI endpoint-by-endpoint basis.
// String realm = aHeader.getRealm();
@@ -102,14 +121,14 @@ public class AuthLib {
}
case UNKNOWN :
// Not handled. Pass back the 401.
- return httpResponse401;
+ return CompletableFuture.completedFuture(httpResponse401);
default:
throw new HttpException("Not an authentication scheme --
"+aHeader.authScheme);
}
// Failed to generate a request modifier for a retry.
if ( authRequestModifier == null)
- return httpResponse401;
+ return CompletableFuture.completedFuture(httpResponse401);
// ---- Register for next time the app calls this URI.
AuthEnv.get().registerAuthModifier(request.uri().toString(),
authRequestModifier);
@@ -119,7 +138,7 @@ public class AuthLib {
request2builder = authRequestModifier.addAuth(request2builder);
HttpRequest httpRequest2 = request2builder.build();
- HttpResponse<T> httpResponse2 = HttpLib.executeJDK(httpClient,
httpRequest2, bodyHandler);
+ CompletableFuture<HttpResponse<T>> httpResponse2 =
HttpLib.executeJDKAsync(httpClient, httpRequest2, bodyHandler);
// Pass back to application regardless of response code.
return httpResponse2;
}
diff --git
a/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/QueryExecHTTP.java
b/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/QueryExecHTTP.java
index 953819a18b..9e4582ad46 100644
--- a/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/QueryExecHTTP.java
+++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/QueryExecHTTP.java
@@ -21,6 +21,7 @@ package org.apache.jena.sparql.exec.http;
import static org.apache.jena.http.HttpLib.*;
import java.io.ByteArrayInputStream;
+import java.io.IOException;
import java.io.InputStream;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
@@ -29,8 +30,12 @@ import java.net.http.HttpResponse;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.*;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.input.ClosedInputStream;
+import org.apache.commons.io.input.ProxyInputStream;
import org.apache.jena.atlas.RuntimeIOException;
import org.apache.jena.atlas.io.IO;
import org.apache.jena.atlas.iterator.Iter;
@@ -44,6 +49,7 @@ import org.apache.jena.atlas.web.HttpException;
import org.apache.jena.atlas.web.MediaType;
import org.apache.jena.graph.Graph;
import org.apache.jena.graph.Triple;
+import org.apache.jena.http.AsyncHttpRDF;
import org.apache.jena.http.HttpEnv;
import org.apache.jena.http.HttpLib;
import org.apache.jena.query.*;
@@ -111,13 +117,27 @@ public class QueryExecHTTP implements QueryExec {
// Received content type
private String httpResponseContentType = null;
- // Releasing HTTP input streams is important. We remember this for SELECT
result
- // set streaming, and will close it when the execution is closed
- private volatile InputStream retainedConnection = null;
private HttpClient httpClient = HttpEnv.getDftHttpClient();
private Map<String, String> httpHeaders;
+ // ----- Cancellation -----
+
+ private volatile boolean isAborted = false;
+ private final Object abortLock = new Object();
+ private volatile CompletableFuture<HttpResponse<InputStream>> future =
null;
+
+ // Releasing HTTP input streams is important. We remember this for SELECT
result
+ // set streaming, and will close it when the execution is closed
+ // This is the physical InputStream of the HTTP request which will only be
closed by close().
+ private InputStream retainedConnection = null;
+
+ // This is a wrapped view of retainedConnection that will be closed by
abort().
+ private volatile InputStream retainedConnectionView = null;
+
+ // Whether abort cancels an async HTTP request's future immediately.
+ private boolean cancelFutureOnAbort = true;
+
/**
* This constructor is superseded by the other one which has more
parameters.
* The recommended way to create instances of this class is via {@link
QueryExecHTTPBuilder}.
@@ -218,12 +238,9 @@ public class QueryExecHTTP implements QueryExec {
}
private RowSet execRowSet() {
- // Use the explicitly given header or the default selectAcceptheader
- String thisAcceptHeader = dft(overrideAcceptHeader,
selectAcceptHeader);
-
- HttpRequest request = effectiveHttpRequest(thisAcceptHeader);
+ HttpRequest request = effectiveHttpRequest(selectAcceptHeader);
HttpResponse<InputStream> response = executeQuery(request);
- InputStream in = HttpLib.getInputStream(response);
+ InputStream in = registerInputStream(response);
// Don't assume the endpoint actually gives back the content type we
asked for
String actualContentType = responseHeader(response,
HttpNames.hContentType);
@@ -240,8 +257,6 @@ public class QueryExecHTTP implements QueryExec {
in = new ByteArrayInputStream(b);
}
- retainedConnection = in; // This will be closed on close()
-
if (actualContentType == null || actualContentType.equals(""))
actualContentType = WebContent.contentTypeResultsXML;
@@ -266,10 +281,9 @@ public class QueryExecHTTP implements QueryExec {
public boolean ask() {
checkNotClosed();
check(QueryType.ASK);
- String thisAcceptHeader = dft(overrideAcceptHeader, askAcceptHeader);
- HttpRequest request = effectiveHttpRequest(thisAcceptHeader);
+ HttpRequest request = effectiveHttpRequest(askAcceptHeader);
HttpResponse<InputStream> response = executeQuery(request);
- InputStream in = HttpLib.getInputStream(response);
+ InputStream in = registerInputStream(response);
String actualContentType = responseHeader(response,
HttpNames.hContentType);
httpResponseContentType = actualContentType;
@@ -406,10 +420,10 @@ public class QueryExecHTTP implements QueryExec {
// ifNoContentType - some wild guess at the content type.
private Pair<InputStream, Lang> execRdfWorker(String contentType, String
ifNoContentType) {
checkNotClosed();
- String thisAcceptHeader = dft(overrideAcceptHeader, contentType);
+ String thisAcceptHeader = contentType;
HttpRequest request = effectiveHttpRequest(thisAcceptHeader);
HttpResponse<InputStream> response = executeQuery(request);
- InputStream in = HttpLib.getInputStream(response);
+ InputStream in = registerInputStream(response);
// Don't assume the endpoint actually gives back the content type we
asked for
String actualContentType = responseHeader(response,
HttpNames.hContentType);
@@ -437,7 +451,7 @@ public class QueryExecHTTP implements QueryExec {
String thisAcceptHeader = dft(overrideAcceptHeader,
WebContent.contentTypeJSON);
HttpRequest request = effectiveHttpRequest(thisAcceptHeader);
HttpResponse<InputStream> response = executeQuery(request);
- InputStream in = HttpLib.getInputStream(response);
+ InputStream in = registerInputStream(response);
try {
return JSON.parseAny(in).getAsArray();
} finally { finishInputStream(in); }
@@ -455,11 +469,6 @@ public class QueryExecHTTP implements QueryExec {
return x.iterator();
}
- private void checkNotClosed() {
- if ( closed )
- throw new QueryExecException("HTTP QueryExecHTTP has been closed");
- }
-
private void check(QueryType queryType) {
if ( query == null ) {
// Pass through the queryString.
@@ -590,15 +599,27 @@ public class QueryExecHTTP implements QueryExec {
}
/**
- * Execute an HttpRequest.
+ * Execute an HttpRequest and wait for the HttpResponse.
+ * A call to {@link #abort()} interrupts the wait.
* The response is returned after status code processing so the caller can
assume the
* query execution was successful and return 200.
* Use {@link HttpLib#getInputStream} to access the body.
*/
private HttpResponse<InputStream> executeQuery(HttpRequest request) {
- logQuery(queryString, request);
+ checkNotClosed();
+
+ if (future != null) {
+ throw new IllegalStateException("Execution was already started.");
+ }
+
try {
- HttpResponse<InputStream> response = execute(httpClient, request);
+ synchronized (abortLock) {
+ checkNotAborted();
+ logQuery(queryString, request);
+ future = HttpLib.executeAsync(httpClient, request);
+ }
+
+ HttpResponse<InputStream> response =
AsyncHttpRDF.getOrElseThrow(future, request);
HttpLib.handleHttpStatusCode(response);
return response;
} catch (HttpException httpEx) {
@@ -676,22 +697,69 @@ public class QueryExecHTTP implements QueryExec {
/**
* Cancel query evaluation
*/
- public void cancel() {
- closed = true;
- }
-
@Override
public void abort() {
- try {
- close();
- } catch (Exception ex) {
- Log.warn(this, "Error during abort", ex);
+ // Setting abort to true causes the next read from
+ // retainedConnectionView (if already created) to
+ // fail with a QueryCancelledException.
+ isAborted = true;
+ if (cancelFutureOnAbort) {
+ cancelFuture(future);
+ }
+ }
+
+ private InputStream registerInputStream(HttpResponse<InputStream>
httpResponse) {
+ InputStream in = HttpLib.getInputStream(httpResponse);
+ registerInputStream(in);
+ return in;
+ }
+
+ /**
+ * Set the given input stream as the 'retainedConnection' and create a
corresponding
+ * asynchronously abortable 'retainedConnectionView'. The latter is
returned.
+ * If execution was already aborted then a {@link QueryCancelledException}
is raised.
+ */
+ private InputStream registerInputStream(InputStream input) {
+ synchronized (abortLock) {
+ this.retainedConnection = input;
+ // Note: Used ProxyInputStream because the ctor of
CloseShieldInputStream is deprecated.
+ this.retainedConnectionView = new ProxyInputStream(input) {
+ @Override
+ protected void beforeRead(int n) throws IOException {
+ checkNotAborted();
+ super.beforeRead(n);
+ }
+ @Override
+ public void close() {
+ this.in = ClosedInputStream.INSTANCE;
+ }
+ };
+
+ // If already aborted then bail out before starting the parsers.
+ checkNotAborted();
}
+ return retainedConnectionView;
}
@Override
public void close() {
closed = true;
+ // No need to handle the future here, because the possible states are:
+ // - Null because no execution was started -> retainedConnection is
null.
+ // - Cancelled by asynchronous abort -> retainedConnection is
null.
+ // - Completed successfully by the same thread that now closes the
retainedConnection
+ // -> retainedConnection is
non-null.
+ IOUtils.closeQuietly(retainedConnectionView);
+ closeRetainedConnection();
+ }
+
+ private static void cancelFuture(CompletableFuture<?> future) {
+ if (future != null) {
+ future.cancel(true);
+ }
+ }
+
+ private void closeRetainedConnection() {
if (retainedConnection != null) {
try {
// This call may take a long time if the response has not been
consumed
@@ -711,6 +779,16 @@ public class QueryExecHTTP implements QueryExec {
}
}
+ private void checkNotClosed() {
+ if ( closed )
+ throw new QueryExecException("HTTP QueryExecHTTP has been closed");
+ }
+
+ protected void checkNotAborted() {
+ if ( isAborted )
+ throw new QueryCancelledException();
+ }
+
@Override
public boolean isClosed() { return closed; }
}
diff --git
a/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/Service.java
b/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/Service.java
index d65db0b21b..f8339401c7 100644
--- a/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/Service.java
+++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/Service.java
@@ -227,7 +227,7 @@ public class Service {
// -- End setup
// Build the execution
- QueryExecHTTP qExec = QueryExecHTTP.newBuilder()
+ try (QueryExecHTTP qExec = QueryExecHTTP.newBuilder()
.endpoint(serviceURL)
.timeout(timeoutMillis, TimeUnit.MILLISECONDS)
.httpHeader(HttpNames.hUserAgent, HttpEnv.UserAgent)
@@ -236,8 +236,8 @@ public class Service {
.context(context)
.httpClient(httpClient)
.sendMode(querySendMode)
- .build();
- try {
+ .build()) {
+
// Detach from the network stream.
RowSet rowSet = qExec.select().materialize();
QueryIterator qIter = QueryIterPlainWrapper.create(rowSet);
diff --git
a/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/UpdateExecHTTP.java
b/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/UpdateExecHTTP.java
index 102b38df2c..a2ebaa1cda 100644
---
a/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/UpdateExecHTTP.java
+++
b/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/UpdateExecHTTP.java
@@ -54,7 +54,9 @@ public class UpdateExecHTTP implements UpdateExec {
private final Context context;
private final String service;
- // Not used private final UpdateRequest update;
+
+ // UpdateRequest as an object - may be null.
+ private final UpdateRequest update;
private final String updateString;
private final Map<String, String> httpHeaders;
private final HttpClient httpClient;
@@ -77,7 +79,7 @@ public class UpdateExecHTTP implements UpdateExec {
long timeout, TimeUnit timeoutUnit) {
this.context = context;
this.service = serviceURL;
- //this.update = update;
+ this.update = update;
// Builder ensures one or the other is set.
this.updateString = ( updateString != null ) ? updateString :
update.toString();
this.httpClient = dft(httpClient, HttpEnv.getDftHttpClient());
@@ -95,6 +97,16 @@ public class UpdateExecHTTP implements UpdateExec {
return context;
}
+ @Override
+ public UpdateRequest getUpdateRequest() {
+ return update;
+ }
+
+ @Override
+ public String getUpdateRequestString() {
+ return updateString;
+ }
+
@Override
public void execute() {
Params thisParams = Params.create(params);
diff --git a/jena-arq/src/main/java/org/apache/jena/update/UpdateProcessor.java
b/jena-arq/src/main/java/org/apache/jena/update/UpdateProcessor.java
index 49ede54058..174e921cd8 100644
--- a/jena-arq/src/main/java/org/apache/jena/update/UpdateProcessor.java
+++ b/jena-arq/src/main/java/org/apache/jena/update/UpdateProcessor.java
@@ -26,6 +26,19 @@ import org.apache.jena.sparql.util.Context;
*/
public interface UpdateProcessor
{
+ /**
+ * The update request associated with this update execution. May be null.
+ */
+ default public UpdateRequest getUpdateRequest() { return null; }
+
+ /**
+ * The update request as a string. May be null.
+ * The string may contain syntax extensions that can not be parsed by Jena.
+ * If {@link #getUpdateRequest()} is not null then this is a corresponding
+ * string that parses to the same update request.
+ */
+ default public String getUpdateRequestString() { return null; }
+
/** Execute */
public void execute() ;