This is an automated email from the ASF dual-hosted git repository.

andy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/jena.git


The following commit(s) were added to refs/heads/main by this push:
     new f90380630f GH-3471: Async HTTP
f90380630f is described below

commit f90380630f11965276816edecc1cbbdf7c6dfa72
Author: Claus Stadler <[email protected]>
AuthorDate: Wed Oct 1 17:24:34 2025 +0200

    GH-3471: Async HTTP
---
 .../org/apache/jena/atlas/web/HttpException.java   |  11 +-
 .../java/org/apache/jena/http/AsyncHttpRDF.java    |  44 +++++--
 .../main/java/org/apache/jena/http/HttpLib.java    | 107 +++++++++-------
 .../java/org/apache/jena/http/auth/AuthLib.java    |  43 +++++--
 .../jena/sparql/exec/http/QueryExecHTTP.java       | 142 ++++++++++++++++-----
 .../org/apache/jena/sparql/exec/http/Service.java  |   6 +-
 .../jena/sparql/exec/http/UpdateExecHTTP.java      |  16 ++-
 .../org/apache/jena/update/UpdateProcessor.java    |  13 ++
 8 files changed, 273 insertions(+), 109 deletions(-)

diff --git 
a/jena-arq/src/main/java/org/apache/jena/atlas/web/HttpException.java 
b/jena-arq/src/main/java/org/apache/jena/atlas/web/HttpException.java
index 5597e80947..23edb47be6 100644
--- a/jena-arq/src/main/java/org/apache/jena/atlas/web/HttpException.java
+++ b/jena-arq/src/main/java/org/apache/jena/atlas/web/HttpException.java
@@ -34,14 +34,15 @@ public class HttpException extends RuntimeException {
     }
 
     public HttpException(int statusCode, String statusLine) {
-        super(exMessage(statusCode, statusLine));
-        this.statusCode = statusCode;
-        this.statusLine = statusLine ;
-        this.response = null;
+        this(statusCode, statusLine, null, null);
     }
 
     public HttpException(int statusCode, String statusLine, String 
responseMessage) {
-        super(exMessage(statusCode, statusLine));
+        this(statusCode, statusLine, responseMessage, null);
+    }
+
+    public HttpException(int statusCode, String statusLine, String 
responseMessage, Throwable cause) {
+        super(exMessage(statusCode, statusLine), cause);
         this.statusCode = statusCode;
         this.statusLine = statusLine ;
         this.response = responseMessage;
diff --git a/jena-arq/src/main/java/org/apache/jena/http/AsyncHttpRDF.java 
b/jena-arq/src/main/java/org/apache/jena/http/AsyncHttpRDF.java
index 5eeee4429b..b84df0075d 100644
--- a/jena-arq/src/main/java/org/apache/jena/http/AsyncHttpRDF.java
+++ b/jena-arq/src/main/java/org/apache/jena/http/AsyncHttpRDF.java
@@ -160,26 +160,50 @@ public class AsyncHttpRDF {
      * This operation extracts RuntimeException from the {@code 
CompletableFuture}.
      */
     public static <T> T getOrElseThrow(CompletableFuture<T> cf) {
+        return getOrElseThrow(cf, null);
+    }
+
+    /**
+     * Get the value of a {@link CompletableFuture} that executes of an HTTP 
request.
+     * In case on any error, an {@link HttpException} is thrown.
+     *
+     * @param <T> The type of the value being computed.
+     * @param cf The completable future.
+     * @param httpRequest An optional HttpRequest for improving feedback in 
case of exceptions.
+     * @return The value computed by the completable future.
+     */
+    public static <T> T getOrElseThrow(CompletableFuture<T> cf, HttpRequest 
httpRequest) {
         Objects.requireNonNull(cf);
         try {
             return cf.join();
         //} catch (CancellationException ex1) { // Let this pass out.
         } catch (CompletionException ex) {
-            if ( ex.getCause() != null ) {
-                Throwable cause = ex.getCause();
-                if ( cause instanceof RuntimeException )
-                    throw (RuntimeException)cause;
+            Throwable cause = ex.getCause();
+            if ( cause != null ) {
+
+                // Pass on our own HttpException instances such as 401 
Unauthorized.
+                if ( cause instanceof HttpException httpEx ) {
+                    throw new HttpException(httpEx.getStatusCode(), 
httpEx.getStatusLine(), httpEx.getResponse(), cause);
+                }
+
+                final String msg = cause.getMessage();
+
                 if ( cause instanceof IOException ) {
-                    IOException iox = (IOException)cause;
                     // Rather than an HTTP exception, bad authentication 
becomes IOException("too many authentication attempts");
-                    if ( iox.getMessage().contains("too many authentication 
attempts") ||
-                            iox.getMessage().contains("No credentials 
provided") ) {
-                        throw new HttpException(401, HttpSC.getMessage(401));
+                    if ( msg != null &&
+                            ( msg.contains("too many authentication attempts") 
||
+                              msg.contains("No credentials provided") ) ) {
+                        throw new HttpException(401, HttpSC.getMessage(401), 
null, cause);
+                    }
+                    if (httpRequest != null) {
+                        throw new HttpException(httpRequest.method()+" 
"+httpRequest.uri().toString(), cause);
                     }
-                    IO.exception((IOException)cause);
                 }
+
+                throw new HttpException(msg, cause);
             }
-            throw ex;
+            // Note: CompletionException without cause should never happen.
+            throw new HttpException(ex);
         }
     }
 
diff --git a/jena-arq/src/main/java/org/apache/jena/http/HttpLib.java 
b/jena-arq/src/main/java/org/apache/jena/http/HttpLib.java
index 1895105624..56205d9619 100644
--- a/jena-arq/src/main/java/org/apache/jena/http/HttpLib.java
+++ b/jena-arq/src/main/java/org/apache/jena/http/HttpLib.java
@@ -37,6 +37,7 @@ import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.*;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -560,7 +561,7 @@ public class HttpLib {
      * @return HttpResponse
      */
     public static HttpResponse<InputStream> execute(HttpClient httpClient, 
HttpRequest httpRequest) {
-        return execute(httpClient, httpRequest, BodyHandlers.ofInputStream());
+        return AsyncHttpRDF.getOrElseThrow(executeAsync(httpClient, 
httpRequest, BodyHandlers.ofInputStream()), httpRequest);
     }
 
     /**
@@ -580,13 +581,35 @@ public class HttpLib {
      *
      * @param httpClient
      * @param httpRequest
-     * @param bodyHandler
      * @return HttpResponse
      */
-    /*package*/ static <X> HttpResponse<X> execute(HttpClient httpClient, 
HttpRequest httpRequest, BodyHandler<X> bodyHandler) {
+    public static CompletableFuture<HttpResponse<InputStream>> 
executeAsync(HttpClient httpClient, HttpRequest httpRequest) {
+        return executeAsync(httpClient, httpRequest, 
BodyHandlers.ofInputStream());
+    }
+
+    /**
+     * Execute a request, return a {@code HttpResponse<X>} which
+     * can be passed to {@link #handleHttpStatusCode(HttpResponse)} which will
+     * convert non-2xx status code to {@link HttpException HttpExceptions}.
+     * <p>
+     * This function applies the HTTP authentication challenge support
+     * and will repeat the request if necessary with added authentication.
+     * <p>
+     * See {@link AuthEnv} for authentication registration.
+     * <br/>
+     * See {@link #executeJDK} to execute exactly once without challenge 
response handling.
+     *
+     * @see AuthEnv AuthEnv for authentic registration
+     * @see #executeJDK executeJDK to execute exacly once.
+     *
+     * @param httpClient
+     * @param httpRequest
+     * @param bodyHandler
+     * @return HttpResponse
+     */    /*package*/ static <X> CompletableFuture<HttpResponse<X>> 
executeAsync(HttpClient httpClient, HttpRequest httpRequest, BodyHandler<X> 
bodyHandler) {
         // To run with no jena-supplied authentication handling.
         if ( false )
-            return executeJDK(httpClient, httpRequest, bodyHandler);
+            return executeJDKAsync(httpClient, httpRequest, bodyHandler);
         URI uri = httpRequest.uri();
         URI key = null;
 
@@ -602,29 +625,16 @@ public class HttpLib {
                 authEnv.registerUsernamePassword(key, userpasswd[0], 
userpasswd[1]);
             }
         }
-        try {
-            return AuthLib.authExecute(httpClient, httpRequest, bodyHandler);
-        } finally {
-            if ( key != null )
-                // The AuthEnv is "per tenant".
-                // Temporary registration within the AuthEnv of the
-                // user:password is acceptable.
-                authEnv.unregisterUsernamePassword(key);
-        }
-    }
 
-    /**
-     * Execute request and return a {@code HttpResponse<InputStream>} response.
-     * Status codes have not been handled. The response can be passed to
-     * {@link #handleResponseInputStream(HttpResponse)} which will convert 
non-2xx
-     * status code to {@link HttpException HttpExceptions}.
-     *
-     * @param httpClient
-     * @param httpRequest
-     * @return HttpResponse
-     */
-    public static HttpResponse<InputStream> executeJDK(HttpClient httpClient, 
HttpRequest httpRequest) {
-        return execute(httpClient, httpRequest, BodyHandlers.ofInputStream());
+        URI finalKey = key;
+        return AuthLib.authExecuteAsync(httpClient, httpRequest, bodyHandler)
+            .whenComplete((httpResponse, throwable) -> {
+                if ( finalKey != null )
+                    // The AuthEnv is "per tenant".
+                    // Temporary registration within the AuthEnv of the
+                    // user:password is acceptable.
+                    authEnv.unregisterUsernamePassword(finalKey);
+            });
     }
 
     /**
@@ -640,25 +650,32 @@ public class HttpLib {
      * @return HttpResponse
      */
     public static <T> HttpResponse<T> executeJDK(HttpClient httpClient, 
HttpRequest httpRequest, BodyHandler<T> bodyHandler) {
-        try {
-            // This is the one place all HTTP requests go through.
-            logRequest(httpRequest);
-            HttpResponse<T> httpResponse = httpClient.send(httpRequest, 
bodyHandler);
-            logResponse(httpResponse);
-            return httpResponse;
-        //} catch (HttpTimeoutException ex) {
-        } catch (IOException | InterruptedException ex) {
-            if ( ex.getMessage() != null ) {
-                // This is silly.
-                // Rather than an HTTP exception, bad authentication becomes 
IOException("too many authentication attempts");
-                // or IOException("No credentials provided") if the 
authenticator decides to return null.
-                if ( ex.getMessage().contains("too many authentication 
attempts") ||
-                     ex.getMessage().contains("No credentials provided") ) {
-                    throw new HttpException(401, HttpSC.getMessage(401));
-                }
-            }
-            throw new HttpException(httpRequest.method()+" 
"+httpRequest.uri().toString(), ex);
-        }
+        return AsyncHttpRDF.getOrElseThrow(executeJDKAsync(httpClient, 
httpRequest, bodyHandler), httpRequest);
+    }
+
+    /**
+     * Execute request and return a {@code HttpResponse<InputStream>} response.
+     * Status codes have not been handled. The response can be passed to
+     * {@link #handleResponseInputStream(HttpResponse)} which will convert 
non-2xx
+     * status code to {@link HttpException HttpExceptions}.
+     *
+     * @param httpClient
+     * @param httpRequest
+     * @return HttpResponse
+     */
+    public static CompletableFuture<HttpResponse<InputStream>> 
executeJDKAsync(HttpClient httpClient, HttpRequest httpRequest) {
+        return executeAsync(httpClient, httpRequest, 
BodyHandlers.ofInputStream());
+    }
+
+    public static <T> CompletableFuture<HttpResponse<T>> 
executeJDKAsync(HttpClient httpClient, HttpRequest httpRequest, BodyHandler<T> 
bodyHandler) {
+        // This is the one place all HTTP requests go through.
+        logRequest(httpRequest);
+        CompletableFuture<HttpResponse<T>> future = 
httpClient.sendAsync(httpRequest, bodyHandler)
+            .thenApply(httpResponse -> {
+                logResponse(httpResponse);
+                return httpResponse;
+            });
+        return future;
     }
 
     /*package*/ static CompletableFuture<HttpResponse<InputStream>> 
asyncExecute(HttpClient httpClient, HttpRequest httpRequest) {
diff --git a/jena-arq/src/main/java/org/apache/jena/http/auth/AuthLib.java 
b/jena-arq/src/main/java/org/apache/jena/http/auth/AuthLib.java
index 504d36fc5c..c00c50a2c5 100644
--- a/jena-arq/src/main/java/org/apache/jena/http/auth/AuthLib.java
+++ b/jena-arq/src/main/java/org/apache/jena/http/auth/AuthLib.java
@@ -29,17 +29,19 @@ import java.net.http.HttpResponse.BodyHandler;
 import java.nio.charset.StandardCharsets;
 import java.util.Base64;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 
 import org.apache.jena.atlas.lib.Bytes;
 import org.apache.jena.atlas.web.AuthScheme;
 import org.apache.jena.atlas.web.HttpException;
+import org.apache.jena.http.AsyncHttpRDF;
 import org.apache.jena.http.HttpLib;
 import org.apache.jena.riot.web.HttpNames;
 import org.apache.jena.web.HttpSC;
 
 public class AuthLib {
     /**
-     * Call {@link HttpClient#send} after applying an active {@link 
AuthRequestModifier}
+     * Call the {@link HttpClient} after applying an active {@link 
AuthRequestModifier}
      * to modify the {@link java.net.http.HttpRequest.Builder}.
      * If no {@link AuthRequestModifier} is available and if a 401 response is 
received,
      * setup a {@link AuthRequestModifier} passed on registered username and 
password information.
@@ -51,24 +53,41 @@ public class AuthLib {
      * @return HttpResponse&lt;T&gt;
      */
     public static <T> HttpResponse<T> authExecute(HttpClient httpClient, 
HttpRequest httpRequest, BodyHandler<T> bodyHandler) {
-        HttpResponse<T> httpResponse = HttpLib.executeJDK(httpClient, 
httpRequest, bodyHandler);
+        return AsyncHttpRDF.getOrElseThrow(authExecuteAsync(httpClient, 
httpRequest, bodyHandler), httpRequest);
+    }
 
-        // -- 401 handling.
-        if ( httpResponse.statusCode() != 401 )
-            return httpResponse;
-        HttpResponse<T> httpResponse2 = handle401(httpClient, httpRequest, 
bodyHandler, httpResponse);
-        return httpResponse2;
+    /**
+     * Call {@link HttpClient#sendAsync} after applying an active {@link 
AuthRequestModifier}
+     * to modify the {@link java.net.http.HttpRequest.Builder}.
+     * If no {@link AuthRequestModifier} is available and if a 401 response is 
received,
+     * setup a {@link AuthRequestModifier} passed on registered username and 
password information.
+     * This function supports basic and digest authentication.
+     *
+     * @param httpClient HttpClient
+     * @param httpRequest
+     * @param bodyHandler
+     * @return CompletableFuture&lt;HttpResponse&lt;T&gt;&gt;
+     */
+    public static <T> CompletableFuture<HttpResponse<T>> 
authExecuteAsync(HttpClient httpClient, HttpRequest httpRequest, BodyHandler<T> 
bodyHandler) {
+        return HttpLib.executeJDKAsync(httpClient, httpRequest, bodyHandler)
+            .thenCompose(httpResponse -> {
+                // -- 401 handling.
+                if ( httpResponse.statusCode() != 401 )
+                    return CompletableFuture.completedFuture(httpResponse);
+                CompletableFuture<HttpResponse<T>> httpResponse2 = 
handle401Async(httpClient, httpRequest, bodyHandler, httpResponse);
+                return httpResponse2;
+            });
     }
 
     /* Handle a 401 (authentication challenge). */
-    private static <T> HttpResponse<T> handle401(HttpClient httpClient,
+    private static <T> CompletableFuture<HttpResponse<T>> 
handle401Async(HttpClient httpClient,
                                                  HttpRequest request,
                                                  BodyHandler<T> bodyHandler,
                                                  HttpResponse<T> 
httpResponse401) {
         AuthChallenge aHeader = wwwAuthenticateHeader(httpResponse401);
         if ( aHeader == null )
             // No valid header - simply return the original response.
-            return httpResponse401;
+            return CompletableFuture.completedFuture(httpResponse401);
 
         // Currently on a URI endpoint-by-endpoint basis.
         // String realm = aHeader.getRealm();
@@ -102,14 +121,14 @@ public class AuthLib {
             }
             case UNKNOWN :
                 // Not handled. Pass back the 401.
-                return httpResponse401;
+                return CompletableFuture.completedFuture(httpResponse401);
             default:
                 throw new HttpException("Not an authentication scheme -- 
"+aHeader.authScheme);
         }
 
         // Failed to generate a request modifier for a retry.
         if ( authRequestModifier == null)
-            return httpResponse401;
+            return CompletableFuture.completedFuture(httpResponse401);
 
         // ---- Register for next time the app calls this URI.
         AuthEnv.get().registerAuthModifier(request.uri().toString(), 
authRequestModifier);
@@ -119,7 +138,7 @@ public class AuthLib {
         request2builder = authRequestModifier.addAuth(request2builder);
 
         HttpRequest httpRequest2 = request2builder.build();
-        HttpResponse<T> httpResponse2 = HttpLib.executeJDK(httpClient, 
httpRequest2, bodyHandler);
+        CompletableFuture<HttpResponse<T>> httpResponse2 = 
HttpLib.executeJDKAsync(httpClient, httpRequest2, bodyHandler);
         // Pass back to application regardless of response code.
         return httpResponse2;
     }
diff --git 
a/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/QueryExecHTTP.java 
b/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/QueryExecHTTP.java
index 953819a18b..9e4582ad46 100644
--- a/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/QueryExecHTTP.java
+++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/QueryExecHTTP.java
@@ -21,6 +21,7 @@ package org.apache.jena.sparql.exec.http;
 import static org.apache.jena.http.HttpLib.*;
 
 import java.io.ByteArrayInputStream;
+import java.io.IOException;
 import java.io.InputStream;
 import java.net.http.HttpClient;
 import java.net.http.HttpRequest;
@@ -29,8 +30,12 @@ import java.net.http.HttpResponse;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.*;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.input.ClosedInputStream;
+import org.apache.commons.io.input.ProxyInputStream;
 import org.apache.jena.atlas.RuntimeIOException;
 import org.apache.jena.atlas.io.IO;
 import org.apache.jena.atlas.iterator.Iter;
@@ -44,6 +49,7 @@ import org.apache.jena.atlas.web.HttpException;
 import org.apache.jena.atlas.web.MediaType;
 import org.apache.jena.graph.Graph;
 import org.apache.jena.graph.Triple;
+import org.apache.jena.http.AsyncHttpRDF;
 import org.apache.jena.http.HttpEnv;
 import org.apache.jena.http.HttpLib;
 import org.apache.jena.query.*;
@@ -111,13 +117,27 @@ public class QueryExecHTTP implements QueryExec {
 
     // Received content type
     private String httpResponseContentType = null;
-    // Releasing HTTP input streams is important. We remember this for SELECT 
result
-    // set streaming, and will close it when the execution is closed
-    private volatile InputStream retainedConnection = null;
 
     private HttpClient httpClient = HttpEnv.getDftHttpClient();
     private Map<String, String> httpHeaders;
 
+    // ----- Cancellation -----
+
+    private volatile boolean isAborted = false;
+    private final Object abortLock = new Object();
+    private volatile CompletableFuture<HttpResponse<InputStream>> future = 
null;
+
+    // Releasing HTTP input streams is important. We remember this for SELECT 
result
+    // set streaming, and will close it when the execution is closed
+    // This is the physical InputStream of the HTTP request which will only be 
closed by close().
+    private InputStream retainedConnection = null;
+
+    // This is a wrapped view of retainedConnection that will be closed by 
abort().
+    private volatile InputStream retainedConnectionView = null;
+
+    // Whether abort cancels an async HTTP request's future immediately.
+    private boolean cancelFutureOnAbort = true;
+
     /**
      * This constructor is superseded by the other one which has more 
parameters.
      * The recommended way to create instances of this class is via {@link 
QueryExecHTTPBuilder}.
@@ -218,12 +238,9 @@ public class QueryExecHTTP implements QueryExec {
     }
 
     private RowSet execRowSet() {
-        // Use the explicitly given header or the default selectAcceptheader
-        String thisAcceptHeader = dft(overrideAcceptHeader, 
selectAcceptHeader);
-
-        HttpRequest request = effectiveHttpRequest(thisAcceptHeader);
+        HttpRequest request = effectiveHttpRequest(selectAcceptHeader);
         HttpResponse<InputStream> response = executeQuery(request);
-        InputStream in = HttpLib.getInputStream(response);
+        InputStream in = registerInputStream(response);
         // Don't assume the endpoint actually gives back the content type we 
asked for
         String actualContentType = responseHeader(response, 
HttpNames.hContentType);
 
@@ -240,8 +257,6 @@ public class QueryExecHTTP implements QueryExec {
             in = new ByteArrayInputStream(b);
         }
 
-        retainedConnection = in; // This will be closed on close()
-
         if (actualContentType == null || actualContentType.equals(""))
             actualContentType = WebContent.contentTypeResultsXML;
 
@@ -266,10 +281,9 @@ public class QueryExecHTTP implements QueryExec {
     public boolean ask() {
         checkNotClosed();
         check(QueryType.ASK);
-        String thisAcceptHeader = dft(overrideAcceptHeader, askAcceptHeader);
-        HttpRequest request = effectiveHttpRequest(thisAcceptHeader);
+        HttpRequest request = effectiveHttpRequest(askAcceptHeader);
         HttpResponse<InputStream> response = executeQuery(request);
-        InputStream in = HttpLib.getInputStream(response);
+        InputStream in = registerInputStream(response);
 
         String actualContentType = responseHeader(response, 
HttpNames.hContentType);
         httpResponseContentType = actualContentType;
@@ -406,10 +420,10 @@ public class QueryExecHTTP implements QueryExec {
     // ifNoContentType - some wild guess at the content type.
     private Pair<InputStream, Lang> execRdfWorker(String contentType, String 
ifNoContentType) {
         checkNotClosed();
-        String thisAcceptHeader = dft(overrideAcceptHeader, contentType);
+        String thisAcceptHeader = contentType;
         HttpRequest request = effectiveHttpRequest(thisAcceptHeader);
         HttpResponse<InputStream> response = executeQuery(request);
-        InputStream in = HttpLib.getInputStream(response);
+        InputStream in = registerInputStream(response);
 
         // Don't assume the endpoint actually gives back the content type we 
asked for
         String actualContentType = responseHeader(response, 
HttpNames.hContentType);
@@ -437,7 +451,7 @@ public class QueryExecHTTP implements QueryExec {
         String thisAcceptHeader = dft(overrideAcceptHeader, 
WebContent.contentTypeJSON);
         HttpRequest request = effectiveHttpRequest(thisAcceptHeader);
         HttpResponse<InputStream> response = executeQuery(request);
-        InputStream in = HttpLib.getInputStream(response);
+        InputStream in = registerInputStream(response);
         try {
             return JSON.parseAny(in).getAsArray();
         } finally { finishInputStream(in); }
@@ -455,11 +469,6 @@ public class QueryExecHTTP implements QueryExec {
         return x.iterator();
     }
 
-    private void checkNotClosed() {
-        if ( closed )
-            throw new QueryExecException("HTTP QueryExecHTTP has been closed");
-    }
-
     private void check(QueryType queryType) {
         if ( query == null ) {
             // Pass through the queryString.
@@ -590,15 +599,27 @@ public class QueryExecHTTP implements QueryExec {
     }
 
     /**
-     * Execute an HttpRequest.
+     * Execute an HttpRequest and wait for the HttpResponse.
+     * A call to {@link #abort()} interrupts the wait.
      * The response is returned after status code processing so the caller can 
assume the
      * query execution was successful and return 200.
      * Use {@link HttpLib#getInputStream} to access the body.
      */
     private HttpResponse<InputStream> executeQuery(HttpRequest request) {
-        logQuery(queryString, request);
+        checkNotClosed();
+
+        if (future != null) {
+            throw new IllegalStateException("Execution was already started.");
+        }
+
         try {
-            HttpResponse<InputStream> response = execute(httpClient, request);
+            synchronized (abortLock) {
+                checkNotAborted();
+                logQuery(queryString, request);
+                future = HttpLib.executeAsync(httpClient, request);
+            }
+
+            HttpResponse<InputStream> response = 
AsyncHttpRDF.getOrElseThrow(future, request);
             HttpLib.handleHttpStatusCode(response);
             return response;
         } catch (HttpException httpEx) {
@@ -676,22 +697,69 @@ public class QueryExecHTTP implements QueryExec {
     /**
      * Cancel query evaluation
      */
-    public void cancel() {
-        closed = true;
-    }
-
     @Override
     public void abort() {
-        try {
-            close();
-        } catch (Exception ex) {
-            Log.warn(this, "Error during abort", ex);
+        // Setting abort to true causes the next read from
+        // retainedConnectionView (if already created) to
+        // fail with a QueryCancelledException.
+        isAborted = true;
+        if (cancelFutureOnAbort) {
+            cancelFuture(future);
+        }
+    }
+
+    private InputStream registerInputStream(HttpResponse<InputStream> 
httpResponse) {
+        InputStream in = HttpLib.getInputStream(httpResponse);
+        registerInputStream(in);
+        return in;
+    }
+
+    /**
+     * Set the given input stream as the 'retainedConnection' and create a 
corresponding
+     * asynchronously abortable 'retainedConnectionView'. The latter is 
returned.
+     * If execution was already aborted then a {@link QueryCancelledException} 
is raised.
+     */
+    private InputStream registerInputStream(InputStream input) {
+        synchronized (abortLock) {
+            this.retainedConnection = input;
+            // Note: Used ProxyInputStream because the ctor of 
CloseShieldInputStream is deprecated.
+            this.retainedConnectionView = new ProxyInputStream(input) {
+                @Override
+                protected void beforeRead(int n) throws IOException {
+                    checkNotAborted();
+                    super.beforeRead(n);
+                }
+                @Override
+                public void close() {
+                    this.in = ClosedInputStream.INSTANCE;
+                }
+            };
+
+            // If already aborted then bail out before starting the parsers.
+            checkNotAborted();
         }
+        return retainedConnectionView;
     }
 
     @Override
     public void close() {
         closed = true;
+        // No need to handle the future here, because the possible states are:
+        // - Null because no execution was started -> retainedConnection is 
null.
+        // - Cancelled by asynchronous abort       -> retainedConnection is 
null.
+        // - Completed successfully by the same thread that now closes the 
retainedConnection
+        //                                         -> retainedConnection is 
non-null.
+        IOUtils.closeQuietly(retainedConnectionView);
+        closeRetainedConnection();
+    }
+
+    private static void cancelFuture(CompletableFuture<?> future) {
+        if (future != null) {
+            future.cancel(true);
+        }
+    }
+
+    private void closeRetainedConnection() {
         if (retainedConnection != null) {
             try {
                 // This call may take a long time if the response has not been 
consumed
@@ -711,6 +779,16 @@ public class QueryExecHTTP implements QueryExec {
         }
     }
 
+    private void checkNotClosed() {
+        if ( closed )
+            throw new QueryExecException("HTTP QueryExecHTTP has been closed");
+    }
+
+    protected void checkNotAborted() {
+        if ( isAborted )
+            throw new QueryCancelledException();
+    }
+
     @Override
     public boolean isClosed() { return closed; }
 }
diff --git 
a/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/Service.java 
b/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/Service.java
index d65db0b21b..f8339401c7 100644
--- a/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/Service.java
+++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/Service.java
@@ -227,7 +227,7 @@ public class Service {
         // -- End setup
 
         // Build the execution
-        QueryExecHTTP qExec = QueryExecHTTP.newBuilder()
+        try (QueryExecHTTP qExec = QueryExecHTTP.newBuilder()
                 .endpoint(serviceURL)
                 .timeout(timeoutMillis, TimeUnit.MILLISECONDS)
                 .httpHeader(HttpNames.hUserAgent, HttpEnv.UserAgent)
@@ -236,8 +236,8 @@ public class Service {
                 .context(context)
                 .httpClient(httpClient)
                 .sendMode(querySendMode)
-                .build();
-        try {
+                .build()) {
+
             // Detach from the network stream.
             RowSet rowSet = qExec.select().materialize();
             QueryIterator qIter = QueryIterPlainWrapper.create(rowSet);
diff --git 
a/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/UpdateExecHTTP.java 
b/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/UpdateExecHTTP.java
index 102b38df2c..a2ebaa1cda 100644
--- 
a/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/UpdateExecHTTP.java
+++ 
b/jena-arq/src/main/java/org/apache/jena/sparql/exec/http/UpdateExecHTTP.java
@@ -54,7 +54,9 @@ public class UpdateExecHTTP implements UpdateExec {
 
     private final Context context;
     private final String service;
-    // Not used private final UpdateRequest update;
+
+    // UpdateRequest as an object - may be null.
+    private final UpdateRequest update;
     private final String updateString;
     private final Map<String, String> httpHeaders;
     private final HttpClient httpClient;
@@ -77,7 +79,7 @@ public class UpdateExecHTTP implements UpdateExec {
                                long timeout, TimeUnit timeoutUnit) {
         this.context = context;
         this.service = serviceURL;
-        //this.update = update;
+        this.update = update;
         // Builder ensures one or the other is set.
         this.updateString = ( updateString != null ) ? updateString : 
update.toString();
         this.httpClient = dft(httpClient, HttpEnv.getDftHttpClient());
@@ -95,6 +97,16 @@ public class UpdateExecHTTP implements UpdateExec {
         return context;
     }
 
+    @Override
+    public UpdateRequest getUpdateRequest() {
+        return update;
+    }
+
+    @Override
+    public String getUpdateRequestString() {
+        return updateString;
+    }
+
     @Override
     public void execute() {
         Params thisParams = Params.create(params);
diff --git a/jena-arq/src/main/java/org/apache/jena/update/UpdateProcessor.java 
b/jena-arq/src/main/java/org/apache/jena/update/UpdateProcessor.java
index 49ede54058..174e921cd8 100644
--- a/jena-arq/src/main/java/org/apache/jena/update/UpdateProcessor.java
+++ b/jena-arq/src/main/java/org/apache/jena/update/UpdateProcessor.java
@@ -26,6 +26,19 @@ import org.apache.jena.sparql.util.Context;
  */
 public interface UpdateProcessor
 {
+    /**
+     * The update request associated with this update execution. May be null.
+     */
+    default public UpdateRequest getUpdateRequest() { return null; }
+
+    /**
+     * The update request as a string. May be null.
+     * The string may contain syntax extensions that can not be parsed by Jena.
+     * If {@link #getUpdateRequest()} is not null then this is a corresponding
+     * string that parses to the same update request.
+     */
+    default public String getUpdateRequestString() { return null; }
+
     /** Execute */
     public void execute() ;
 

Reply via email to