This is an automated email from the ASF dual-hosted git repository.

andy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/jena.git


The following commit(s) were added to refs/heads/main by this push:
     new f68c511f6b GH-3016: Streamlined methods for starting service executor 
chains over and added test cases.
f68c511f6b is described below

commit f68c511f6b22a1caf65c89814a59c882a89b1801
Author: Claus Stadler <[email protected]>
AuthorDate: Tue Feb 18 11:49:27 2025 +0100

    GH-3016: Streamlined methods for starting service executor chains over and 
added test cases.
---
 .../apache/jena/sparql/engine/main/OpExecutor.java |   2 +-
 .../apache/jena/sparql/service/ServiceExec.java    |  42 +++++++-
 .../test/service/TestCustomServiceExecutor.java    | 107 +++++++++++++++++++++
 3 files changed, 149 insertions(+), 2 deletions(-)

diff --git 
a/jena-arq/src/main/java/org/apache/jena/sparql/engine/main/OpExecutor.java 
b/jena-arq/src/main/java/org/apache/jena/sparql/engine/main/OpExecutor.java
index a659b6c4da..d5e7598ebe 100644
--- a/jena-arq/src/main/java/org/apache/jena/sparql/engine/main/OpExecutor.java
+++ b/jena-arq/src/main/java/org/apache/jena/sparql/engine/main/OpExecutor.java
@@ -308,7 +308,7 @@ public class OpExecutor {
     }
 
     protected QueryIterator execute(OpService opService, QueryIterator input) {
-        return ServiceExec.exec(input, opService, execCxt);
+        return ServiceExec.exec(opService, input, execCxt);
     }
 
     // Quad form, "GRAPH ?g {}" Flip back to OpGraph.
diff --git 
a/jena-arq/src/main/java/org/apache/jena/sparql/service/ServiceExec.java 
b/jena-arq/src/main/java/org/apache/jena/sparql/service/ServiceExec.java
index 69f2f30cad..c9d41399ab 100644
--- a/jena-arq/src/main/java/org/apache/jena/sparql/service/ServiceExec.java
+++ b/jena-arq/src/main/java/org/apache/jena/sparql/service/ServiceExec.java
@@ -21,23 +21,63 @@ package org.apache.jena.sparql.service;
 import org.apache.jena.sparql.algebra.op.OpService;
 import org.apache.jena.sparql.engine.ExecutionContext;
 import org.apache.jena.sparql.engine.QueryIterator;
+import org.apache.jena.sparql.engine.binding.Binding;
 import org.apache.jena.sparql.service.bulk.ServiceExecutorBulk;
 import org.apache.jena.sparql.service.bulk.ServiceExecutorBulkOverRegistry;
+import org.apache.jena.sparql.service.single.ServiceExecutor;
+import org.apache.jena.sparql.service.single.ServiceExecutorOverRegistry;
 import org.apache.jena.sparql.util.Context;
 
 /**
  * Entry into the service executor from SPARQL queries.
  */
 public class ServiceExec {
+
+    /**
+     * Use {@link #exec(OpService, QueryIterator, ExecutionContext)} whose 
parameter order matches that of
+     * {@link ServiceExecutorBulk#createExecution(OpService, QueryIterator, 
ExecutionContext)}.
+     */
+    @Deprecated(forRemoval = true, since = "5.4.0")
+    public static QueryIterator exec(QueryIterator input, OpService opService, 
ExecutionContext execCxt) {
+        return exec(input, opService, execCxt);
+    }
+
     /**
      * Execute an OpService w.r.t. the execCxt's service executor registry.
      * This is the route from OpExecutor.
+     *
+     * This method can also be used to pass a modified request through the 
whole
+     * service executor chain, as exemplified below:
+     * <pre>{@code
+     * ServiceExecutorRegistry.get().addBulkLink((opService, input, execCxt, 
chain) -> {
+     *     if (canHandle(opService)) {
+     *         OpService modifiedOp = modifyOp(opService);
+     *         // Forward the request to the beginning of the chain.
+     *         return ServiceExec.exec(modifiedOp, input, execCxt);
+     *     } else {
+     *         // Forward the request to the remaining handlers in the chain.
+     *         return chain.createExecution(opService, input, execCxt);
+     *     }
+     * });
+     * }</pre>
      */
-    public static QueryIterator exec(QueryIterator input, OpService opService, 
ExecutionContext execCxt) {
+    public static QueryIterator exec(OpService opService, QueryIterator input, 
ExecutionContext execCxt) {
         Context cxt = execCxt.getContext();
         ServiceExecutorRegistry registry = 
ServiceExecutorRegistry.chooseRegistry(cxt);
         ServiceExecutorBulk serviceExecutor = new 
ServiceExecutorBulkOverRegistry(registry);
         QueryIterator qIter = serviceExecutor.createExecution(opService, 
input, execCxt);
         return qIter;
     }
+
+    /**
+     * Execute an OpService w.r.t. the execCxt's service executor registry -
+     * concretely its single chain which operates on a per-binding basis.
+     */
+    public static QueryIterator exec(OpService opExecute, OpService original, 
Binding binding, ExecutionContext execCxt) {
+        Context cxt = execCxt.getContext();
+        ServiceExecutorRegistry registry = 
ServiceExecutorRegistry.chooseRegistry(cxt);
+        ServiceExecutor serviceExecutor = new 
ServiceExecutorOverRegistry(registry);
+        QueryIterator qIter = serviceExecutor.createExecution(opExecute, 
original, binding, execCxt);
+        return qIter;
+    }
 }
diff --git 
a/jena-integration-tests/src/test/java/org/apache/jena/test/service/TestCustomServiceExecutor.java
 
b/jena-integration-tests/src/test/java/org/apache/jena/test/service/TestCustomServiceExecutor.java
index 144547f6b9..c31e267ffb 100644
--- 
a/jena-integration-tests/src/test/java/org/apache/jena/test/service/TestCustomServiceExecutor.java
+++ 
b/jena-integration-tests/src/test/java/org/apache/jena/test/service/TestCustomServiceExecutor.java
@@ -25,14 +25,23 @@ import java.util.function.Consumer;
 
 import org.apache.jena.atlas.logging.LogCtl;
 import org.apache.jena.query.*;
+import org.apache.jena.graph.Node;
+import org.apache.jena.graph.NodeFactory;
 import org.apache.jena.rdf.model.Model;
 import org.apache.jena.rdf.model.ModelFactory;
 import org.apache.jena.riot.ResultSetMgr;
 import org.apache.jena.riot.resultset.ResultSetLang;
+import org.apache.jena.sparql.ARQConstants;
 import org.apache.jena.sparql.algebra.Table;
+import org.apache.jena.sparql.algebra.op.OpService;
+import org.apache.jena.sparql.core.DatasetGraphFactory;
 import org.apache.jena.sparql.core.Var;
+import org.apache.jena.sparql.engine.ExecutionContext;
+import org.apache.jena.sparql.engine.QueryIterator;
 import org.apache.jena.sparql.engine.binding.Binding;
+import org.apache.jena.sparql.exec.QueryExec;
 import org.apache.jena.sparql.resultset.ResultSetCompare;
+import org.apache.jena.sparql.service.ServiceExec;
 import org.apache.jena.sparql.service.ServiceExecutorRegistry;
 import org.apache.jena.sparql.service.single.ServiceExecutor;
 import org.apache.jena.sparql.sse.SSE;
@@ -115,6 +124,104 @@ public class TestCustomServiceExecutor {
         }
     }
 
+    /**
+     * A test case where custom executors both forward requests down the chain 
as well
+     * as start the chain over using {@link ServiceExec#exec(QueryIterator, 
OpService, ExecutionContext)}.
+     *
+     * This test case tests the chain for bulk execution.
+     */
+    @Test
+    public void testRestartServiceChainBulk() {
+        Node a = NodeFactory.createURI("urn:a");
+        Node b = NodeFactory.createURI("urn:b");
+        Node c = NodeFactory.createURI("urn:c");
+
+        // The comments are numbered with the expected order of the execution 
flow.
+
+        ServiceExecutorRegistry reg = new ServiceExecutorRegistry();
+        reg.addBulkLink((opService, input, execCxt, chain) -> {
+            Node node = opService.getService();
+            if (node.equals(a)) {
+                // 2. Match 'a' and restart the chain with 'b'
+                return ServiceExec.exec(new OpService(b, opService.getSubOp(), 
false), input, execCxt);
+            } else if (node.equals(c)) {
+                // 4. Match 'c' and return the test table
+                return table.iterator(execCxt);
+            } else {
+                throw new RuntimeException("Unexpectedly got: " + node);
+            }
+        });
+
+        reg.addBulkLink((opService, input, execCxt, chain) -> {
+            Node node = opService.getService();
+            if (node.equals(a)) {
+                // 1. Match 'a' and forward 'a'
+                return chain.createExecution(opService, input, execCxt);
+            } else if (node.equals(b)) {
+                // 3. Match 'b' and forward 'c'
+                return chain.createExecution(new OpService(c, 
opService.getSubOp(), false), input, execCxt);
+            } else {
+                throw new RuntimeException("Unexpectedly got: " + node);
+            }
+        });
+
+        // 0. Start the test with 'a'
+        Table actualTable = QueryExec.dataset(DatasetGraphFactory.empty())
+            .query("SELECT ?s ?p ?o { SERVICE <urn:a> { } }")
+            .set(ARQConstants.registryServiceExecutors, reg)
+            .table();
+        Assert.assertEquals(table, actualTable);
+    }
+
+    /**
+     * A test case where custom executors both forward requests down the chain 
as well
+     * as start the chain over using {@link ServiceExec#exec(OpService, 
OpService, Binding, ExecutionContext)}.
+     *
+     * This test case tests the chain for single binding execution.
+     */
+    @Test
+    public void testRestartServiceChainSingle() {
+        Node a = NodeFactory.createURI("urn:a");
+        Node b = NodeFactory.createURI("urn:b");
+        Node c = NodeFactory.createURI("urn:c");
+
+        // The comments are numbered with the expected order of the execution 
flow.
+
+        ServiceExecutorRegistry reg = new ServiceExecutorRegistry();
+        reg.addSingleLink((opExecute, opOriginal, binding, execCxt, chain) -> {
+            Node node = opExecute.getService();
+            if (node.equals(a)) {
+                // 2. Match 'a' and restart the chain with 'b'
+                return ServiceExec.exec(new OpService(b, opExecute.getSubOp(), 
false), opOriginal, binding, execCxt);
+            } else if (node.equals(c)) {
+                // 4. Match 'c' and return the test table
+                return table.iterator(execCxt);
+            } else {
+                throw new RuntimeException("Unexpectedly got: " + node);
+            }
+        });
+
+        reg.addSingleLink((opExecute, opOriginal, binding, execCxt, chain) -> {
+            Node node = opExecute.getService();
+            if (node.equals(a)) {
+                // 1. Match 'a' and forward 'a'
+                return chain.createExecution(opExecute, opOriginal, binding, 
execCxt);
+            } else if (node.equals(b)) {
+                // 3. Match 'b' and forward 'c'
+                return chain.createExecution(new OpService(c, 
opExecute.getSubOp(), false), opOriginal, binding, execCxt);
+            } else {
+                throw new RuntimeException("Unexpectedly got: " + node);
+            }
+        });
+
+        // 0. Start the test with 'a'
+        Table actualTable = QueryExec.dataset(DatasetGraphFactory.empty())
+            .query("SELECT ?s ?p ?o { SERVICE <urn:a> { } }")
+            .set(ARQConstants.registryServiceExecutors, reg)
+            .table();
+        Assert.assertEquals(table, actualTable);
+    }
+
     // Check to rule out interference with conventional access to remote 
endpoints.
     // Uncommenting @Test will print out data from the remote endpoint
     //@Test

Reply via email to