[MARMOTTA-621] implement Java client side support for direct ASK and CONSTRUCT queries
Project: http://git-wip-us.apache.org/repos/asf/marmotta/repo Commit: http://git-wip-us.apache.org/repos/asf/marmotta/commit/4ab20b3d Tree: http://git-wip-us.apache.org/repos/asf/marmotta/tree/4ab20b3d Diff: http://git-wip-us.apache.org/repos/asf/marmotta/diff/4ab20b3d Branch: refs/heads/MARMOTTA-584 Commit: 4ab20b3d874ff1062d4ddb5fb52790d7fe7348a8 Parents: 4a5e588 Author: Sebastian Schaffert <[email protected]> Authored: Sat Feb 13 16:14:03 2016 +0100 Committer: Sebastian Schaffert <[email protected]> Committed: Sat Feb 13 16:14:03 2016 +0100 ---------------------------------------------------------------------- .../ostrich/sail/OstrichSailConnection.java | 97 ++++++++++++++++++-- .../backend/ostrich/OstrichSailRepository.java | 72 +++++++++++++-- 2 files changed, 153 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/marmotta/blob/4ab20b3d/libraries/ostrich/client/src/main/java/org/apache/marmotta/ostrich/sail/OstrichSailConnection.java ---------------------------------------------------------------------- diff --git a/libraries/ostrich/client/src/main/java/org/apache/marmotta/ostrich/sail/OstrichSailConnection.java b/libraries/ostrich/client/src/main/java/org/apache/marmotta/ostrich/sail/OstrichSailConnection.java index e85fdd2..b8f6b27 100644 --- a/libraries/ostrich/client/src/main/java/org/apache/marmotta/ostrich/sail/OstrichSailConnection.java +++ b/libraries/ostrich/client/src/main/java/org/apache/marmotta/ostrich/sail/OstrichSailConnection.java @@ -24,6 +24,7 @@ import info.aduna.iteration.*; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.Status; +import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; import org.apache.marmotta.ostrich.client.proto.Sail; import org.apache.marmotta.ostrich.client.proto.SailServiceGrpc; @@ -67,7 +68,8 @@ public class OstrichSailConnection extends NotifyingSailConnectionBase { private static Logger log = LoggerFactory.getLogger(OstrichSailConnection.class); private final ManagedChannel channel; - private final SailServiceGrpc.SailServiceBlockingStub stub; + private final SailServiceGrpc.SailServiceBlockingStub blockingSailStub; + private final SparqlServiceGrpc.SparqlServiceBlockingStub blockingSparqlStub; private final SailServiceGrpc.SailServiceStub sailServiceStub; private final SparqlServiceGrpc.SparqlServiceStub sparqlServiceStub; @@ -80,9 +82,10 @@ public class OstrichSailConnection extends NotifyingSailConnectionBase { channel = ManagedChannelBuilder.forAddress(host, port) .usePlaintext(true) .build(); - stub = SailServiceGrpc.newBlockingStub(channel); + blockingSailStub = SailServiceGrpc.newBlockingStub(channel); sailServiceStub = SailServiceGrpc.newStub(channel); sparqlServiceStub = SparqlServiceGrpc.newStub(channel); + blockingSparqlStub = SparqlServiceGrpc.newBlockingStub(channel); updateResponseObserver = new StreamObserver<Sail.UpdateResponse>() { @Override @@ -177,11 +180,21 @@ public class OstrichSailConnection extends NotifyingSailConnectionBase { * @return * @throws SailException */ - public CloseableIteration<? extends BindingSet, QueryEvaluationException> directTupleQuery(String query) throws SailException { + public CloseableIteration<? extends BindingSet, QueryEvaluationException> directTupleQuery(String query, String baseUri) throws SailException { log.info("Committing transaction before querying ..."); commitForQuery(); - Sparql.SparqlRequest request = Sparql.SparqlRequest.newBuilder().setQuery(query).build(); + Sparql.SparqlRequest request; + if (baseUri != null) { + request = Sparql.SparqlRequest.newBuilder() + .setQuery(query) + .setBaseUri(new ProtoURI(baseUri).getMessage()) + .build(); + } else { + request = Sparql.SparqlRequest.newBuilder() + .setQuery(query) + .build(); + } return new ExceptionConvertingIteration<BindingSet, QueryEvaluationException>( new ConvertingIteration<Sparql.SparqlResponse, BindingSet, SailException>( @@ -226,12 +239,80 @@ public class OstrichSailConnection extends NotifyingSailConnectionBase { }; } + /** + * Send a SPARQL query to a backend supporting direct SPARQL evaluation. + * + * @param query + * @return + * @throws SailException + */ + public CloseableIteration<? extends Statement, QueryEvaluationException> directGraphQuery(String query, String baseUri) throws SailException { + log.info("Committing transaction before querying ..."); + commitForQuery(); + + Sparql.SparqlRequest request; + if (baseUri != null) { + request = Sparql.SparqlRequest.newBuilder() + .setQuery(query) + .setBaseUri(new ProtoURI(baseUri).getMessage()) + .build(); + } else { + request = Sparql.SparqlRequest.newBuilder() + .setQuery(query) + .build(); + } + + return new ExceptionConvertingIteration<Statement, QueryEvaluationException>( + new ConvertingIteration<Model.Statement, Statement, SailException>( + new ClosableResponseStream<>(sparqlServiceStub, SparqlServiceGrpc.METHOD_GRAPH_QUERY, request)) { + @Override + protected Statement convert(Model.Statement sourceObject) throws SailException { + return new ProtoStatement(sourceObject); + } + }) { + @Override + protected QueryEvaluationException convert(Exception e) { + return new QueryEvaluationException(e); + } + }; + } + + /** + * Send a SPARQL query to a backend supporting direct SPARQL evaluation. + * + * @param query + * @return + * @throws SailException + */ + public boolean directBooleanQuery(String query, String baseUri) throws SailException { + log.info("Committing transaction before querying ..."); + commitForQuery(); + + Sparql.SparqlRequest request; + if (baseUri != null) { + request = Sparql.SparqlRequest.newBuilder() + .setQuery(query) + .setBaseUri(new ProtoURI(baseUri).getMessage()) + .build(); + } else { + request = Sparql.SparqlRequest.newBuilder() + .setQuery(query) + .build(); + } + + try { + return blockingSparqlStub.askQuery(request).getValue(); + } catch (StatusRuntimeException ex) { + throw new SailException(ex.getMessage()); + } + } + @Override protected CloseableIteration<? extends Resource, SailException> getContextIDsInternal() throws SailException { log.info("Committing transaction before querying ..."); commitForQuery(); - return wrapResourceIterator(stub.getContexts(Empty.getDefaultInstance())); + return wrapResourceIterator(blockingSailStub.getContexts(Empty.getDefaultInstance())); } @Override @@ -272,7 +353,7 @@ public class OstrichSailConnection extends NotifyingSailConnectionBase { } } - Int64Value v = stub.size(builder.build()); + Int64Value v = blockingSailStub.size(builder.build()); return v.getValue(); } @@ -361,7 +442,7 @@ public class OstrichSailConnection extends NotifyingSailConnectionBase { commitForQuery(); Empty pattern = Empty.getDefaultInstance(); - return wrapNamespaceIterator(stub.getNamespaces(pattern)); + return wrapNamespaceIterator(blockingSailStub.getNamespaces(pattern)); } @Override @@ -371,7 +452,7 @@ public class OstrichSailConnection extends NotifyingSailConnectionBase { Model.Namespace pattern = Model.Namespace.newBuilder().setPrefix(prefix).build(); try { - return stub.getNamespace(pattern).getUri(); + return blockingSailStub.getNamespace(pattern).getUri(); } catch (io.grpc.StatusRuntimeException ex) { if (ex.getStatus().getCode() == Status.Code.NOT_FOUND) { return null; http://git-wip-us.apache.org/repos/asf/marmotta/blob/4ab20b3d/platform/backends/marmotta-backend-ostrich/src/main/java/org/apache/marmotta/platform/backend/ostrich/OstrichSailRepository.java ---------------------------------------------------------------------- diff --git a/platform/backends/marmotta-backend-ostrich/src/main/java/org/apache/marmotta/platform/backend/ostrich/OstrichSailRepository.java b/platform/backends/marmotta-backend-ostrich/src/main/java/org/apache/marmotta/platform/backend/ostrich/OstrichSailRepository.java index 0dbf1e8..684ae5b 100644 --- a/platform/backends/marmotta-backend-ostrich/src/main/java/org/apache/marmotta/platform/backend/ostrich/OstrichSailRepository.java +++ b/platform/backends/marmotta-backend-ostrich/src/main/java/org/apache/marmotta/platform/backend/ostrich/OstrichSailRepository.java @@ -19,16 +19,13 @@ package org.apache.marmotta.platform.backend.ostrich; import info.aduna.iteration.CloseableIteration; import org.apache.marmotta.ostrich.sail.OstrichSailConnection; +import org.openrdf.model.Statement; import org.openrdf.query.*; +import org.openrdf.query.impl.GraphQueryResultImpl; import org.openrdf.query.impl.TupleQueryResultImpl; -import org.openrdf.query.parser.ParsedQuery; -import org.openrdf.query.parser.ParsedTupleQuery; -import org.openrdf.query.parser.QueryParserUtil; +import org.openrdf.query.parser.*; import org.openrdf.repository.RepositoryException; -import org.openrdf.repository.sail.SailQuery; -import org.openrdf.repository.sail.SailRepository; -import org.openrdf.repository.sail.SailRepositoryConnection; -import org.openrdf.repository.sail.SailTupleQuery; +import org.openrdf.repository.sail.*; import org.openrdf.sail.Sail; import org.openrdf.sail.SailConnection; import org.openrdf.sail.SailException; @@ -37,6 +34,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.HashMap; /** * A wrapper SailRepository for Ostrich allowing access to direct SPARQL support. @@ -69,7 +67,7 @@ public class OstrichSailRepository extends SailRepository { // Let Sesame still parse the query for better error messages and for the binding names. ParsedTupleQuery parsedQuery = QueryParserUtil.parseTupleQuery(ql, queryString, baseURI); OstrichSailConnection sailCon = findConnection(getConnection().getSailConnection()); - bindingsIter = sailCon.directTupleQuery(queryString); + bindingsIter = sailCon.directTupleQuery(queryString, baseURI); bindingsIter = enforceMaxQueryTime(bindingsIter); return new TupleQueryResultImpl(new ArrayList<String>(parsedQuery.getTupleExpr().getBindingNames()), bindingsIter); @@ -86,11 +84,69 @@ public class OstrichSailRepository extends SailRepository { } @Override + public SailBooleanQuery prepareBooleanQuery(final QueryLanguage ql, final String queryString, final String baseURI) throws MalformedQueryException { + if (ql == QueryLanguage.SPARQL) { + return new SailBooleanQuery(null, this) { + @Override + public boolean evaluate() throws QueryEvaluationException { + try { + log.info("Running native SPARQL query: {}", queryString); + CloseableIteration<? extends BindingSet, QueryEvaluationException> bindingsIter; + + // Let Sesame still parse the query for better error messages and for the binding names. + ParsedBooleanQuery parsedQuery = QueryParserUtil.parseBooleanQuery(ql, queryString, baseURI); + OstrichSailConnection sailCon = findConnection(getConnection().getSailConnection()); + return sailCon.directBooleanQuery(queryString, baseURI); + } catch (SailException e) { + throw new QueryEvaluationException(e.getMessage(), e); + } catch (MalformedQueryException e) { + throw new QueryEvaluationException(e.getMessage(), e); + } + } + }; + } else { + return super.prepareBooleanQuery(ql, queryString, baseURI); + } + } + + @Override + public SailGraphQuery prepareGraphQuery(final QueryLanguage ql, final String queryString, final String baseURI) throws MalformedQueryException { + if (ql == QueryLanguage.SPARQL) { + return new SailGraphQuery(null, this) { + @Override + public GraphQueryResult evaluate() throws QueryEvaluationException { + try { + log.info("Running native SPARQL query: {}", queryString); + CloseableIteration<? extends Statement, ? extends QueryEvaluationException> bindingsIter; + + // Let Sesame still parse the query for better error messages and for the binding names. + ParsedGraphQuery parsedQuery = QueryParserUtil.parseGraphQuery(ql, queryString, baseURI); + OstrichSailConnection sailCon = findConnection(getConnection().getSailConnection()); + bindingsIter = sailCon.directGraphQuery(queryString, baseURI); + + return new GraphQueryResultImpl(new HashMap<String, String>(), bindingsIter); + } catch (SailException e) { + throw new QueryEvaluationException(e.getMessage(), e); + } catch (MalformedQueryException e) { + throw new QueryEvaluationException(e.getMessage(), e); + } + } + }; + } else { + return super.prepareGraphQuery(ql, queryString, baseURI); + } + } + + @Override public SailQuery prepareQuery(QueryLanguage ql, String queryString, String baseURI) throws MalformedQueryException { ParsedQuery parsedQuery = QueryParserUtil.parseQuery(ql, queryString, baseURI); if (parsedQuery instanceof ParsedTupleQuery) { return prepareTupleQuery(ql, queryString, baseURI); + } else if (parsedQuery instanceof ParsedBooleanQuery) { + return prepareBooleanQuery(ql, queryString, baseURI); + } else if (parsedQuery instanceof ParsedGraphQuery) { + return prepareGraphQuery(ql, queryString, baseURI); } else { return super.prepareQuery(ql, queryString, baseURI); }
