This is an automated email from the ASF dual-hosted git repository.
andy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/jena.git
The following commit(s) were added to refs/heads/main by this push:
new f68c511f6b GH-3016: Streamlined methods for starting service executor
chains over and added test cases.
f68c511f6b is described below
commit f68c511f6b22a1caf65c89814a59c882a89b1801
Author: Claus Stadler <[email protected]>
AuthorDate: Tue Feb 18 11:49:27 2025 +0100
GH-3016: Streamlined methods for starting service executor chains over and
added test cases.
---
.../apache/jena/sparql/engine/main/OpExecutor.java | 2 +-
.../apache/jena/sparql/service/ServiceExec.java | 42 +++++++-
.../test/service/TestCustomServiceExecutor.java | 107 +++++++++++++++++++++
3 files changed, 149 insertions(+), 2 deletions(-)
diff --git
a/jena-arq/src/main/java/org/apache/jena/sparql/engine/main/OpExecutor.java
b/jena-arq/src/main/java/org/apache/jena/sparql/engine/main/OpExecutor.java
index a659b6c4da..d5e7598ebe 100644
--- a/jena-arq/src/main/java/org/apache/jena/sparql/engine/main/OpExecutor.java
+++ b/jena-arq/src/main/java/org/apache/jena/sparql/engine/main/OpExecutor.java
@@ -308,7 +308,7 @@ public class OpExecutor {
}
protected QueryIterator execute(OpService opService, QueryIterator input) {
- return ServiceExec.exec(input, opService, execCxt);
+ return ServiceExec.exec(opService, input, execCxt);
}
// Quad form, "GRAPH ?g {}" Flip back to OpGraph.
diff --git
a/jena-arq/src/main/java/org/apache/jena/sparql/service/ServiceExec.java
b/jena-arq/src/main/java/org/apache/jena/sparql/service/ServiceExec.java
index 69f2f30cad..c9d41399ab 100644
--- a/jena-arq/src/main/java/org/apache/jena/sparql/service/ServiceExec.java
+++ b/jena-arq/src/main/java/org/apache/jena/sparql/service/ServiceExec.java
@@ -21,23 +21,63 @@ package org.apache.jena.sparql.service;
import org.apache.jena.sparql.algebra.op.OpService;
import org.apache.jena.sparql.engine.ExecutionContext;
import org.apache.jena.sparql.engine.QueryIterator;
+import org.apache.jena.sparql.engine.binding.Binding;
import org.apache.jena.sparql.service.bulk.ServiceExecutorBulk;
import org.apache.jena.sparql.service.bulk.ServiceExecutorBulkOverRegistry;
+import org.apache.jena.sparql.service.single.ServiceExecutor;
+import org.apache.jena.sparql.service.single.ServiceExecutorOverRegistry;
import org.apache.jena.sparql.util.Context;
/**
* Entry into the service executor from SPARQL queries.
*/
public class ServiceExec {
+
+ /**
+ * Use {@link #exec(OpService, QueryIterator, ExecutionContext)} whose
parameter order matches that of
+ * {@link ServiceExecutorBulk#createExecution(OpService, QueryIterator,
ExecutionContext)}.
+ */
+ @Deprecated(forRemoval = true, since = "5.4.0")
+ public static QueryIterator exec(QueryIterator input, OpService opService,
ExecutionContext execCxt) {
+ return exec(input, opService, execCxt);
+ }
+
/**
* Execute an OpService w.r.t. the execCxt's service executor registry.
* This is the route from OpExecutor.
+ *
+ * This method can also be used to pass a modified request through the
whole
+ * service executor chain, as exemplified below:
+ * <pre>{@code
+ * ServiceExecutorRegistry.get().addBulkLink((opService, input, execCxt,
chain) -> {
+ * if (canHandle(opService)) {
+ * OpService modifiedOp = modifyOp(opService);
+ * // Forward the request to the beginning of the chain.
+ * return ServiceExec.exec(modifiedOp, input, execCxt);
+ * } else {
+ * // Forward the request to the remaining handlers in the chain.
+ * return chain.createExecution(opService, input, execCxt);
+ * }
+ * });
+ * }</pre>
*/
- public static QueryIterator exec(QueryIterator input, OpService opService,
ExecutionContext execCxt) {
+ public static QueryIterator exec(OpService opService, QueryIterator input,
ExecutionContext execCxt) {
Context cxt = execCxt.getContext();
ServiceExecutorRegistry registry =
ServiceExecutorRegistry.chooseRegistry(cxt);
ServiceExecutorBulk serviceExecutor = new
ServiceExecutorBulkOverRegistry(registry);
QueryIterator qIter = serviceExecutor.createExecution(opService,
input, execCxt);
return qIter;
}
+
+ /**
+ * Execute an OpService w.r.t. the execCxt's service executor registry -
+ * concretely its single chain which operates on a per-binding basis.
+ */
+ public static QueryIterator exec(OpService opExecute, OpService original,
Binding binding, ExecutionContext execCxt) {
+ Context cxt = execCxt.getContext();
+ ServiceExecutorRegistry registry =
ServiceExecutorRegistry.chooseRegistry(cxt);
+ ServiceExecutor serviceExecutor = new
ServiceExecutorOverRegistry(registry);
+ QueryIterator qIter = serviceExecutor.createExecution(opExecute,
original, binding, execCxt);
+ return qIter;
+ }
}
diff --git
a/jena-integration-tests/src/test/java/org/apache/jena/test/service/TestCustomServiceExecutor.java
b/jena-integration-tests/src/test/java/org/apache/jena/test/service/TestCustomServiceExecutor.java
index 144547f6b9..c31e267ffb 100644
---
a/jena-integration-tests/src/test/java/org/apache/jena/test/service/TestCustomServiceExecutor.java
+++
b/jena-integration-tests/src/test/java/org/apache/jena/test/service/TestCustomServiceExecutor.java
@@ -25,14 +25,23 @@ import java.util.function.Consumer;
import org.apache.jena.atlas.logging.LogCtl;
import org.apache.jena.query.*;
+import org.apache.jena.graph.Node;
+import org.apache.jena.graph.NodeFactory;
import org.apache.jena.rdf.model.Model;
import org.apache.jena.rdf.model.ModelFactory;
import org.apache.jena.riot.ResultSetMgr;
import org.apache.jena.riot.resultset.ResultSetLang;
+import org.apache.jena.sparql.ARQConstants;
import org.apache.jena.sparql.algebra.Table;
+import org.apache.jena.sparql.algebra.op.OpService;
+import org.apache.jena.sparql.core.DatasetGraphFactory;
import org.apache.jena.sparql.core.Var;
+import org.apache.jena.sparql.engine.ExecutionContext;
+import org.apache.jena.sparql.engine.QueryIterator;
import org.apache.jena.sparql.engine.binding.Binding;
+import org.apache.jena.sparql.exec.QueryExec;
import org.apache.jena.sparql.resultset.ResultSetCompare;
+import org.apache.jena.sparql.service.ServiceExec;
import org.apache.jena.sparql.service.ServiceExecutorRegistry;
import org.apache.jena.sparql.service.single.ServiceExecutor;
import org.apache.jena.sparql.sse.SSE;
@@ -115,6 +124,104 @@ public class TestCustomServiceExecutor {
}
}
+ /**
+ * A test case where custom executors both forward requests down the chain
as well
+ * as start the chain over using {@link ServiceExec#exec(QueryIterator,
OpService, ExecutionContext)}.
+ *
+ * This test case tests the chain for bulk execution.
+ */
+ @Test
+ public void testRestartServiceChainBulk() {
+ Node a = NodeFactory.createURI("urn:a");
+ Node b = NodeFactory.createURI("urn:b");
+ Node c = NodeFactory.createURI("urn:c");
+
+ // The comments are numbered with the expected order of the execution
flow.
+
+ ServiceExecutorRegistry reg = new ServiceExecutorRegistry();
+ reg.addBulkLink((opService, input, execCxt, chain) -> {
+ Node node = opService.getService();
+ if (node.equals(a)) {
+ // 2. Match 'a' and restart the chain with 'b'
+ return ServiceExec.exec(new OpService(b, opService.getSubOp(),
false), input, execCxt);
+ } else if (node.equals(c)) {
+ // 4. Match 'c' and return the test table
+ return table.iterator(execCxt);
+ } else {
+ throw new RuntimeException("Unexpectedly got: " + node);
+ }
+ });
+
+ reg.addBulkLink((opService, input, execCxt, chain) -> {
+ Node node = opService.getService();
+ if (node.equals(a)) {
+ // 1. Match 'a' and forward 'a'
+ return chain.createExecution(opService, input, execCxt);
+ } else if (node.equals(b)) {
+ // 3. Match 'b' and forward 'c'
+ return chain.createExecution(new OpService(c,
opService.getSubOp(), false), input, execCxt);
+ } else {
+ throw new RuntimeException("Unexpectedly got: " + node);
+ }
+ });
+
+ // 0. Start the test with 'a'
+ Table actualTable = QueryExec.dataset(DatasetGraphFactory.empty())
+ .query("SELECT ?s ?p ?o { SERVICE <urn:a> { } }")
+ .set(ARQConstants.registryServiceExecutors, reg)
+ .table();
+ Assert.assertEquals(table, actualTable);
+ }
+
+ /**
+ * A test case where custom executors both forward requests down the chain
as well
+ * as start the chain over using {@link ServiceExec#exec(OpService,
OpService, Binding, ExecutionContext)}.
+ *
+ * This test case tests the chain for single binding execution.
+ */
+ @Test
+ public void testRestartServiceChainSingle() {
+ Node a = NodeFactory.createURI("urn:a");
+ Node b = NodeFactory.createURI("urn:b");
+ Node c = NodeFactory.createURI("urn:c");
+
+ // The comments are numbered with the expected order of the execution
flow.
+
+ ServiceExecutorRegistry reg = new ServiceExecutorRegistry();
+ reg.addSingleLink((opExecute, opOriginal, binding, execCxt, chain) -> {
+ Node node = opExecute.getService();
+ if (node.equals(a)) {
+ // 2. Match 'a' and restart the chain with 'b'
+ return ServiceExec.exec(new OpService(b, opExecute.getSubOp(),
false), opOriginal, binding, execCxt);
+ } else if (node.equals(c)) {
+ // 4. Match 'c' and return the test table
+ return table.iterator(execCxt);
+ } else {
+ throw new RuntimeException("Unexpectedly got: " + node);
+ }
+ });
+
+ reg.addSingleLink((opExecute, opOriginal, binding, execCxt, chain) -> {
+ Node node = opExecute.getService();
+ if (node.equals(a)) {
+ // 1. Match 'a' and forward 'a'
+ return chain.createExecution(opExecute, opOriginal, binding,
execCxt);
+ } else if (node.equals(b)) {
+ // 3. Match 'b' and forward 'c'
+ return chain.createExecution(new OpService(c,
opExecute.getSubOp(), false), opOriginal, binding, execCxt);
+ } else {
+ throw new RuntimeException("Unexpectedly got: " + node);
+ }
+ });
+
+ // 0. Start the test with 'a'
+ Table actualTable = QueryExec.dataset(DatasetGraphFactory.empty())
+ .query("SELECT ?s ?p ?o { SERVICE <urn:a> { } }")
+ .set(ARQConstants.registryServiceExecutors, reg)
+ .table();
+ Assert.assertEquals(table, actualTable);
+ }
+
// Check to rule out interference with conventional access to remote
endpoints.
// Uncommenting @Test will print out data from the remote endpoint
//@Test