Repository: marmotta Updated Branches: refs/heads/develop da96706ce -> 89df9e039
SPARQL: code simplification Project: http://git-wip-us.apache.org/repos/asf/marmotta/repo Commit: http://git-wip-us.apache.org/repos/asf/marmotta/commit/89df9e03 Tree: http://git-wip-us.apache.org/repos/asf/marmotta/tree/89df9e03 Diff: http://git-wip-us.apache.org/repos/asf/marmotta/diff/89df9e03 Branch: refs/heads/develop Commit: 89df9e039b8a5168d7277e51cd84469be6254466 Parents: da96706 Author: Sebastian Schaffert <[email protected]> Authored: Fri Nov 7 15:42:08 2014 +0100 Committer: Sebastian Schaffert <[email protected]> Committed: Fri Nov 7 15:42:08 2014 +0100 ---------------------------------------------------------------------- .../evaluation/KiWiEvaluationStrategy.java | 409 +++++++++++++++++++ .../evaluation/KiWiEvaluationStrategyImpl.java | 202 --------- .../persistence/KiWiSparqlConnection.java | 274 ------------- .../kiwi/sparql/sail/KiWiSparqlSail.java | 7 +- .../sparql/sail/KiWiSparqlSailConnection.java | 10 +- 5 files changed, 415 insertions(+), 487 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/marmotta/blob/89df9e03/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/evaluation/KiWiEvaluationStrategy.java ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/evaluation/KiWiEvaluationStrategy.java b/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/evaluation/KiWiEvaluationStrategy.java new file mode 100644 index 0000000..122edfa --- /dev/null +++ b/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/evaluation/KiWiEvaluationStrategy.java @@ -0,0 +1,409 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.marmotta.kiwi.sparql.evaluation; + +import info.aduna.iteration.*; +import org.apache.marmotta.commons.vocabulary.XSD; +import org.apache.marmotta.kiwi.model.rdf.KiWiNode; +import org.apache.marmotta.kiwi.persistence.KiWiConnection; +import org.apache.marmotta.kiwi.persistence.util.ResultSetIteration; +import org.apache.marmotta.kiwi.persistence.util.ResultTransformerFunction; +import org.apache.marmotta.kiwi.sail.KiWiValueFactory; +import org.apache.marmotta.kiwi.sparql.builder.ProjectionType; +import org.apache.marmotta.kiwi.sparql.builder.SQLBuilder; +import org.apache.marmotta.kiwi.sparql.builder.collect.SupportedFinder; +import org.apache.marmotta.kiwi.sparql.builder.model.SQLVariable; +import org.apache.marmotta.kiwi.sparql.exception.UnsatisfiableQueryException; +import org.openrdf.model.URI; +import org.openrdf.model.impl.BNodeImpl; +import org.openrdf.model.impl.LiteralImpl; +import org.openrdf.model.impl.URIImpl; +import org.openrdf.query.*; +import org.openrdf.query.algebra.*; +import org.openrdf.query.algebra.evaluation.TripleSource; +import org.openrdf.query.algebra.evaluation.impl.EvaluationStrategyImpl; +import org.openrdf.query.impl.MapBindingSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.*; + +/** + * An implementation of the SPARQL query evaluation strategy with specific extensions and optimizations. The KiWi + * evaluation strategy is able to apply optimizations to certain frequently found query patterns by directly translating + * them into SQL queries. Currently, the following constructs are supported: + * <ul> + * <li>JOINs of statement patterns are translated into SQL joins (no OPTIONAL and no path expressions supporterd)</li> + * <li>FILTERs are translated to SQL where conditions, in case the FILTER conditions are supported (no aggregation constructs are supported)</li> + * </ul> + * In case a query is not completely supported by the optimizer, the optimizer might still improve performance by + * evaluating the optimizable components of the query and then letting the in-memory implementation take over + * (e.g. for aggregation constructs, distinct, path expressions, optional). + * + * @author Sebastian Schaffert ([email protected]) + */ +public class KiWiEvaluationStrategy extends EvaluationStrategyImpl{ + + private static Logger log = LoggerFactory.getLogger(KiWiEvaluationStrategy.class); + + + /** + * The database connection offering specific SPARQL-SQL optimizations. + */ + private KiWiConnection connection; + private KiWiValueFactory valueFactory; + private ExecutorService executorService; + + + private Set<String> projectedVars = new HashSet<>(); + + public KiWiEvaluationStrategy(TripleSource tripleSource, KiWiConnection connection, KiWiValueFactory valueFactory) { + super(tripleSource); + this.connection = connection; + this.valueFactory = valueFactory; + + // interruptible queries run in a separate thread + this.executorService = Executors.newCachedThreadPool(); + } + + public KiWiEvaluationStrategy(TripleSource tripleSource, Dataset dataset, KiWiConnection connection, KiWiValueFactory valueFactory) { + super(tripleSource, dataset); + this.connection = connection; + this.valueFactory = valueFactory; + + // interruptible queries run in a separate thread + this.executorService = Executors.newCachedThreadPool(); + } + + @Override + public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(Projection projection, BindingSet bindings) throws QueryEvaluationException { + // count projected variables + if(isSupported(projection.getArg())) { + for (ProjectionElem elem : projection.getProjectionElemList().getElements()) { + projectedVars.add(elem.getSourceName()); + } + } + + return super.evaluate(projection, bindings); + } + + + @Override + public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(Union union, BindingSet bindings) throws QueryEvaluationException { + if(isSupported(union)) { + return evaluateNative(union, bindings); + } else { + return super.evaluate(union, bindings); + } + } + + @Override + public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(Extension order, BindingSet bindings) throws QueryEvaluationException { + if(isSupported(order)) { + return evaluateNative(order, bindings); + } else { + return super.evaluate(order, bindings); + } + } + + + @Override + public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(Order order, BindingSet bindings) throws QueryEvaluationException { + if(isSupported(order)) { + return evaluateNative(order, bindings); + } else { + return super.evaluate(order, bindings); + } + } + + + @Override + public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(LeftJoin join, BindingSet bindings) throws QueryEvaluationException { + if(isSupported(join)) { + return evaluateNative(join, bindings); + } else { + return super.evaluate(join, bindings); + } + } + + + @Override + public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(Join join, BindingSet bindings) throws QueryEvaluationException { + if(isSupported(join)) { + return evaluateNative(join, bindings); + } else { + return super.evaluate(join, bindings); + } + } + + @Override + public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(Filter join, BindingSet bindings) throws QueryEvaluationException { + if(isSupported(join)) { + return evaluateNative(join, bindings); + } else { + return super.evaluate(join, bindings); + } + } + + @Override + public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(Slice slice, BindingSet bindings) throws QueryEvaluationException { + if(isSupported(slice)) { + return evaluateNative(slice, bindings); + } else { + return super.evaluate(slice, bindings); + } + } + + @Override + public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(Reduced reduced, BindingSet bindings) throws QueryEvaluationException { + if(isSupported(reduced)) { + return evaluateNative(reduced, bindings); + } else { + return super.evaluate(reduced, bindings); + } + } + + @Override + public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(Distinct distinct, BindingSet bindings) throws QueryEvaluationException { + if(isSupported(distinct)) { + return evaluateNative(distinct, bindings); + } else { + return super.evaluate(distinct, bindings); + } + } + + /** + * Evaluate a statement pattern join or filter on the database by translating it into an appropriate SQL statement. + * Copied and adapted from KiWiReasoningConnection.query() + * + * @param join + * @return + */ + public CloseableIteration<BindingSet, QueryEvaluationException> evaluateNative(TupleExpr join, final BindingSet bindings) throws QueryEvaluationException { + log.debug("applying KiWi native optimizations on SPARQL query ..."); + + try { + final SQLBuilder builder = new SQLBuilder(join, bindings, dataset, valueFactory, connection.getDialect(), projectedVars); + + final PreparedStatement queryStatement = connection.getJDBCConnection().prepareStatement(builder.build().toString()); + if (connection.getDialect().isCursorSupported()) { + queryStatement.setFetchSize(connection.getConfiguration().getCursorSize()); + } + + Future<ResultSet> queryFuture = + executorService.submit(new Callable<ResultSet>() { + @Override + public ResultSet call() throws Exception { + try { + return queryStatement.executeQuery(); + } catch (SQLException ex) { + if (Thread.interrupted()) { + log.info("SQL query execution cancelled; not returning result (Thread={})", Thread.currentThread()); + throw new InterruptedException("SPARQL query execution cancelled"); + } else { + throw ex; + } + } + } + } + ); + + try { + ResultSet result = queryFuture.get(); + + ResultSetIteration<BindingSet> it = new ResultSetIteration<BindingSet>(result, true, new ResultTransformerFunction<BindingSet>() { + @Override + public BindingSet apply(ResultSet row) throws SQLException { + MapBindingSet resultRow = new MapBindingSet(); + + List<SQLVariable> vars = new ArrayList<>(builder.getVariables().values()); + + long[] nodeIds = new long[vars.size()]; + for(int i=0; i<vars.size(); i++) { + SQLVariable sv = vars.get(i); + if(sv.getProjectionType() == ProjectionType.NODE && (builder.getProjectedVars().isEmpty() || builder.getProjectedVars().contains(sv.getSparqlName()))) { + nodeIds[i] = row.getLong(sv.getName()); + } + } + KiWiNode[] nodes = connection.loadNodesByIds(nodeIds); + + for (int i = 0; i < vars.size(); i++) { + SQLVariable sv = vars.get(i); + if(nodes[i] != null) { + // resolved node + resultRow.addBinding(sv.getSparqlName(), nodes[i]); + } else if(sv.getProjectionType() != ProjectionType.NONE && (builder.getProjectedVars().isEmpty() || builder.getProjectedVars().contains(sv.getSparqlName()))) { + // literal value + String svalue; + switch (sv.getProjectionType()) { + case URI: + svalue = row.getString(sv.getName()); + if(svalue != null) + resultRow.addBinding(sv.getSparqlName(), new URIImpl(svalue)); + break; + case BNODE: + svalue = row.getString(sv.getName()); + if(svalue != null) + resultRow.addBinding(sv.getSparqlName(), new BNodeImpl(svalue)); + break; + case INT: + if(row.getObject(sv.getName()) != null) { + svalue = Integer.toString(row.getInt(sv.getName())); + URI type = XSD.Integer; + try { + long typeId = row.getLong(sv.getName() + "_TYPE"); + if (typeId > 0) + type = (URI) connection.loadNodeById(typeId); + } catch (SQLException ex) { + } + + resultRow.addBinding(sv.getSparqlName(), new LiteralImpl(svalue, type)); + } + break; + case DOUBLE: + if(row.getObject(sv.getName()) != null) { + svalue = Double.toString(row.getDouble(sv.getName())); + URI type = XSD.Double; + try { + long typeId = row.getLong(sv.getName() + "_TYPE"); + if (typeId > 0) + type = (URI) connection.loadNodeById(typeId); + } catch (SQLException ex) { + } + + resultRow.addBinding(sv.getSparqlName(), new LiteralImpl(svalue, type)); + } + break; + case BOOL: + if(row.getObject(sv.getName()) != null) { + svalue = Boolean.toString(row.getBoolean(sv.getName())); + resultRow.addBinding(sv.getSparqlName(), new LiteralImpl(svalue.toLowerCase(), XSD.Boolean)); + } + break; + case STRING: + default: + svalue = row.getString(sv.getName()); + + if(svalue != null) { + + // retrieve optional type and language information, because string functions + // need to preserve this in certain cases, even when constructing new literals + String lang = null; + try { + lang = row.getString(sv.getName() + "_LANG"); + } catch (SQLException ex) { + } + + URI type = null; + try { + long typeId = row.getLong(sv.getName() + "_TYPE"); + if (typeId > 0) + type = (URI) connection.loadNodeById(typeId); + } catch (SQLException ex) { + } + + if (lang != null) { + if (svalue.length() > 0) { + resultRow.addBinding(sv.getSparqlName(), new LiteralImpl(svalue, lang)); + } else { + // string functions that return empty literal should yield no type or language + resultRow.addBinding(sv.getSparqlName(), new LiteralImpl("")); + } + } else if (type != null) { + if(type.stringValue().equals(XSD.String.stringValue())) { + // string functions on other datatypes than string should yield no binding + if (svalue.length() > 0) { + resultRow.addBinding(sv.getSparqlName(), new LiteralImpl(svalue, type)); + } else { + // string functions that return empty literal should yield no type or language + resultRow.addBinding(sv.getSparqlName(), new LiteralImpl("")); + } + } + } else { + resultRow.addBinding(sv.getSparqlName(), new LiteralImpl(svalue)); + } + + } + break; + } + } + } + + + if (bindings != null) { + for (Binding binding : bindings) { + resultRow.addBinding(binding); + } + } + return resultRow; + } + }); + + + return new ExceptionConvertingIteration<BindingSet, QueryEvaluationException>(new CloseableIteratorIteration<BindingSet, SQLException>(Iterations.asList(it).iterator())) { + @Override + protected QueryEvaluationException convert(Exception e) { + return new QueryEvaluationException(e); + } + }; + + } catch (InterruptedException | CancellationException e) { + log.info("SPARQL query execution cancelled"); + queryFuture.cancel(true); + queryStatement.cancel(); + queryStatement.close(); + + throw new QueryInterruptedException("SPARQL query execution cancelled"); + } catch (ExecutionException e) { + log.error("error executing SPARQL query", e.getCause()); + if (e.getCause() instanceof SQLException) { + throw new QueryEvaluationException(e.getCause()); + } else if (e.getCause() instanceof InterruptedException) { + throw new QueryInterruptedException(e.getCause()); + } else { + throw new QueryEvaluationException("error executing SPARQL query", e); + } + } + } catch (SQLException e) { + throw new QueryEvaluationException(e); + } catch (IllegalArgumentException e) { + throw new QueryEvaluationException(e); + } catch (UnsatisfiableQueryException ex) { + return new EmptyIteration<>(); + } + } + + + /** + * Test if a tuple expression is supported nby the optimized evaluation; in this case we can apply a specific optimization. + * + * @param expr + * @return + */ + private boolean isSupported(TupleExpr expr) { + return new SupportedFinder(expr, connection.getDialect()).isSupported(); + } + +} http://git-wip-us.apache.org/repos/asf/marmotta/blob/89df9e03/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/evaluation/KiWiEvaluationStrategyImpl.java ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/evaluation/KiWiEvaluationStrategyImpl.java b/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/evaluation/KiWiEvaluationStrategyImpl.java deleted file mode 100644 index 7e6d7bb..0000000 --- a/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/evaluation/KiWiEvaluationStrategyImpl.java +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.marmotta.kiwi.sparql.evaluation; - -import info.aduna.iteration.CloseableIteration; -import info.aduna.iteration.ExceptionConvertingIteration; -import org.apache.marmotta.kiwi.sparql.builder.collect.SupportedFinder; -import org.apache.marmotta.kiwi.sparql.persistence.KiWiSparqlConnection; -import org.openrdf.query.BindingSet; -import org.openrdf.query.Dataset; -import org.openrdf.query.QueryEvaluationException; -import org.openrdf.query.QueryInterruptedException; -import org.openrdf.query.algebra.*; -import org.openrdf.query.algebra.evaluation.TripleSource; -import org.openrdf.query.algebra.evaluation.impl.EvaluationStrategyImpl; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.sql.SQLException; -import java.util.HashSet; -import java.util.Set; - -/** - * An implementation of the SPARQL query evaluation strategy with specific extensions and optimizations. The KiWi - * evaluation strategy is able to apply optimizations to certain frequently found query patterns by directly translating - * them into SQL queries. Currently, the following constructs are supported: - * <ul> - * <li>JOINs of statement patterns are translated into SQL joins (no OPTIONAL and no path expressions supporterd)</li> - * <li>FILTERs are translated to SQL where conditions, in case the FILTER conditions are supported (no aggregation constructs are supported)</li> - * </ul> - * In case a query is not completely supported by the optimizer, the optimizer might still improve performance by - * evaluating the optimizable components of the query and then letting the in-memory implementation take over - * (e.g. for aggregation constructs, distinct, path expressions, optional). - * - * @author Sebastian Schaffert ([email protected]) - */ -public class KiWiEvaluationStrategyImpl extends EvaluationStrategyImpl{ - - private static Logger log = LoggerFactory.getLogger(KiWiEvaluationStrategyImpl.class); - - - /** - * The database connection offering specific SPARQL-SQL optimizations. - */ - private KiWiSparqlConnection connection; - - private Set<String> projectedVars = new HashSet<>(); - - public KiWiEvaluationStrategyImpl(TripleSource tripleSource, KiWiSparqlConnection connection) { - super(tripleSource); - this.connection = connection; - } - - public KiWiEvaluationStrategyImpl(TripleSource tripleSource, Dataset dataset, KiWiSparqlConnection connection) { - super(tripleSource, dataset); - this.connection = connection; - } - - @Override - public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(Projection projection, BindingSet bindings) throws QueryEvaluationException { - // count projected variables - if(isSupported(projection.getArg())) { - for (ProjectionElem elem : projection.getProjectionElemList().getElements()) { - projectedVars.add(elem.getSourceName()); - } - } - - return super.evaluate(projection, bindings); - } - - - @Override - public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(Union union, BindingSet bindings) throws QueryEvaluationException { - if(isSupported(union)) { - return evaluateNative(union, bindings); - } else { - return super.evaluate(union, bindings); - } - } - - @Override - public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(Extension order, BindingSet bindings) throws QueryEvaluationException { - if(isSupported(order)) { - return evaluateNative(order, bindings); - } else { - return super.evaluate(order, bindings); - } - } - - - @Override - public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(Order order, BindingSet bindings) throws QueryEvaluationException { - if(isSupported(order)) { - return evaluateNative(order, bindings); - } else { - return super.evaluate(order, bindings); - } - } - - - @Override - public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(LeftJoin join, BindingSet bindings) throws QueryEvaluationException { - if(isSupported(join)) { - return evaluateNative(join, bindings); - } else { - return super.evaluate(join, bindings); - } - } - - - @Override - public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(Join join, BindingSet bindings) throws QueryEvaluationException { - if(isSupported(join)) { - return evaluateNative(join, bindings); - } else { - return super.evaluate(join, bindings); - } - } - - @Override - public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(Filter join, BindingSet bindings) throws QueryEvaluationException { - if(isSupported(join)) { - return evaluateNative(join, bindings); - } else { - return super.evaluate(join, bindings); - } - } - - @Override - public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(Slice slice, BindingSet bindings) throws QueryEvaluationException { - if(isSupported(slice)) { - return evaluateNative(slice, bindings); - } else { - return super.evaluate(slice, bindings); - } - } - - @Override - public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(Reduced reduced, BindingSet bindings) throws QueryEvaluationException { - if(isSupported(reduced)) { - return evaluateNative(reduced, bindings); - } else { - return super.evaluate(reduced, bindings); - } - } - - @Override - public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(Distinct distinct, BindingSet bindings) throws QueryEvaluationException { - if(isSupported(distinct)) { - return evaluateNative(distinct, bindings); - } else { - return super.evaluate(distinct, bindings); - } - } - - public CloseableIteration<BindingSet, QueryEvaluationException> evaluateNative(TupleExpr expr, BindingSet bindings) throws QueryEvaluationException { - log.debug("applying KiWi native optimizations on SPARQL query ..."); - - try { - return new ExceptionConvertingIteration<BindingSet, QueryEvaluationException>(connection.evaluateNative(expr, bindings, dataset, projectedVars)) { - @Override - protected QueryEvaluationException convert(Exception e) { - return new QueryEvaluationException(e); - } - }; - } catch (SQLException e) { - throw new QueryEvaluationException(e); - } catch (IllegalArgumentException e) { - throw new QueryEvaluationException(e); - } catch (InterruptedException e) { - throw new QueryInterruptedException(e); - } - } - - - - /** - * Test if a tuple expression is supported nby the optimized evaluation; in this case we can apply a specific optimization. - * - * @param expr - * @return - */ - private boolean isSupported(TupleExpr expr) { - return new SupportedFinder(expr, connection.getDialect()).isSupported(); - } - -} http://git-wip-us.apache.org/repos/asf/marmotta/blob/89df9e03/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/persistence/KiWiSparqlConnection.java ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/persistence/KiWiSparqlConnection.java b/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/persistence/KiWiSparqlConnection.java deleted file mode 100644 index 822f510..0000000 --- a/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/persistence/KiWiSparqlConnection.java +++ /dev/null @@ -1,274 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.marmotta.kiwi.sparql.persistence; - -import info.aduna.iteration.CloseableIteration; -import info.aduna.iteration.CloseableIteratorIteration; -import info.aduna.iteration.EmptyIteration; -import info.aduna.iteration.Iterations; -import org.apache.marmotta.commons.vocabulary.XSD; -import org.apache.marmotta.kiwi.model.rdf.KiWiNode; -import org.apache.marmotta.kiwi.persistence.KiWiConnection; -import org.apache.marmotta.kiwi.persistence.KiWiDialect; -import org.apache.marmotta.kiwi.persistence.util.ResultSetIteration; -import org.apache.marmotta.kiwi.persistence.util.ResultTransformerFunction; -import org.apache.marmotta.kiwi.sail.KiWiValueFactory; -import org.apache.marmotta.kiwi.sparql.builder.ProjectionType; -import org.apache.marmotta.kiwi.sparql.builder.SQLBuilder; -import org.apache.marmotta.kiwi.sparql.builder.model.SQLVariable; -import org.apache.marmotta.kiwi.sparql.exception.UnsatisfiableQueryException; -import org.openrdf.model.URI; -import org.openrdf.model.impl.BNodeImpl; -import org.openrdf.model.impl.LiteralImpl; -import org.openrdf.model.impl.URIImpl; -import org.openrdf.query.Binding; -import org.openrdf.query.BindingSet; -import org.openrdf.query.Dataset; -import org.openrdf.query.algebra.TupleExpr; -import org.openrdf.query.impl.MapBindingSet; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.List; -import java.util.Set; -import java.util.concurrent.*; - -/** - * Provide improved SPARQL support by evaluating certain common compley SPARQL constructs directly on the - * database (e.g. JOIN over pattern queries). - * <p/> - * Implemented using a decorator pattern (i.e. wrapping the KiWiConnection). - * - * @author Sebastian Schaffert ([email protected]) - */ -public class KiWiSparqlConnection { - - private static Logger log = LoggerFactory.getLogger(KiWiSparqlConnection.class); - - private KiWiConnection parent; - private KiWiValueFactory valueFactory; - - private ExecutorService executorService; - - public KiWiSparqlConnection(KiWiConnection parent, KiWiValueFactory valueFactory) throws SQLException { - this.parent = parent; - this.valueFactory = valueFactory; - - // interruptible queries run in a separate thread - this.executorService = Executors.newCachedThreadPool(); - } - - /** - * Evaluate a statement pattern join or filter on the database by translating it into an appropriate SQL statement. - * Copied and adapted from KiWiReasoningConnection.query() - * - * @param join - * @param dataset - * @return - */ - public CloseableIteration<BindingSet, SQLException> evaluateNative(TupleExpr join, final BindingSet bindings, final Dataset dataset, Set<String> projectedVars) throws SQLException, InterruptedException { - - try { - final SQLBuilder builder = new SQLBuilder(join, bindings, dataset, valueFactory, parent.getDialect(), projectedVars); - - final PreparedStatement queryStatement = parent.getJDBCConnection().prepareStatement(builder.build().toString()); - if (parent.getDialect().isCursorSupported()) { - queryStatement.setFetchSize(parent.getConfiguration().getCursorSize()); - } - - Future<ResultSet> queryFuture = - executorService.submit(new Callable<ResultSet>() { - @Override - public ResultSet call() throws Exception { - try { - return queryStatement.executeQuery(); - } catch (SQLException ex) { - if (Thread.interrupted()) { - log.info("SQL query execution cancelled; not returning result (Thread={})", Thread.currentThread()); - throw new InterruptedException("SPARQL query execution cancelled"); - } else { - throw ex; - } - } - } - } - ); - - try { - ResultSet result = queryFuture.get(); - - ResultSetIteration<BindingSet> it = new ResultSetIteration<BindingSet>(result, true, new ResultTransformerFunction<BindingSet>() { - @Override - public BindingSet apply(ResultSet row) throws SQLException { - MapBindingSet resultRow = new MapBindingSet(); - - List<SQLVariable> vars = new ArrayList<>(builder.getVariables().values()); - - long[] nodeIds = new long[vars.size()]; - for(int i=0; i<vars.size(); i++) { - SQLVariable sv = vars.get(i); - if(sv.getProjectionType() == ProjectionType.NODE && (builder.getProjectedVars().isEmpty() || builder.getProjectedVars().contains(sv.getSparqlName()))) { - nodeIds[i] = row.getLong(sv.getName()); - } - } - KiWiNode[] nodes = parent.loadNodesByIds(nodeIds); - - for (int i = 0; i < vars.size(); i++) { - SQLVariable sv = vars.get(i); - if(nodes[i] != null) { - // resolved node - resultRow.addBinding(sv.getSparqlName(), nodes[i]); - } else if(sv.getProjectionType() != ProjectionType.NONE && (builder.getProjectedVars().isEmpty() || builder.getProjectedVars().contains(sv.getSparqlName()))) { - // literal value - String svalue; - switch (sv.getProjectionType()) { - case URI: - svalue = row.getString(sv.getName()); - if(svalue != null) - resultRow.addBinding(sv.getSparqlName(), new URIImpl(svalue)); - break; - case BNODE: - svalue = row.getString(sv.getName()); - if(svalue != null) - resultRow.addBinding(sv.getSparqlName(), new BNodeImpl(svalue)); - break; - case INT: - if(row.getObject(sv.getName()) != null) { - svalue = Integer.toString(row.getInt(sv.getName())); - URI type = XSD.Integer; - try { - long typeId = row.getLong(sv.getName() + "_TYPE"); - if (typeId > 0) - type = (URI) parent.loadNodeById(typeId); - } catch (SQLException ex) { - } - - resultRow.addBinding(sv.getSparqlName(), new LiteralImpl(svalue, type)); - } - break; - case DOUBLE: - if(row.getObject(sv.getName()) != null) { - svalue = Double.toString(row.getDouble(sv.getName())); - URI type = XSD.Double; - try { - long typeId = row.getLong(sv.getName() + "_TYPE"); - if (typeId > 0) - type = (URI) parent.loadNodeById(typeId); - } catch (SQLException ex) { - } - - resultRow.addBinding(sv.getSparqlName(), new LiteralImpl(svalue, type)); - } - break; - case BOOL: - if(row.getObject(sv.getName()) != null) { - svalue = Boolean.toString(row.getBoolean(sv.getName())); - resultRow.addBinding(sv.getSparqlName(), new LiteralImpl(svalue.toLowerCase(), XSD.Boolean)); - } - break; - case STRING: - default: - svalue = row.getString(sv.getName()); - - if(svalue != null) { - - // retrieve optional type and language information, because string functions - // need to preserve this in certain cases, even when constructing new literals - String lang = null; - try { - lang = row.getString(sv.getName() + "_LANG"); - } catch (SQLException ex) { - } - - URI type = null; - try { - long typeId = row.getLong(sv.getName() + "_TYPE"); - if (typeId > 0) - type = (URI) parent.loadNodeById(typeId); - } catch (SQLException ex) { - } - - if (lang != null) { - if (svalue.length() > 0) { - resultRow.addBinding(sv.getSparqlName(), new LiteralImpl(svalue, lang)); - } else { - // string functions that return empty literal should yield no type or language - resultRow.addBinding(sv.getSparqlName(), new LiteralImpl("")); - } - } else if (type != null) { - if(type.stringValue().equals(XSD.String.stringValue())) { - // string functions on other datatypes than string should yield no binding - if (svalue.length() > 0) { - resultRow.addBinding(sv.getSparqlName(), new LiteralImpl(svalue, type)); - } else { - // string functions that return empty literal should yield no type or language - resultRow.addBinding(sv.getSparqlName(), new LiteralImpl("")); - } - } - } else { - resultRow.addBinding(sv.getSparqlName(), new LiteralImpl(svalue)); - } - - } - break; - } - } - } - - - if (bindings != null) { - for (Binding binding : bindings) { - resultRow.addBinding(binding); - } - } - return resultRow; - } - }); - - // materialize result to avoid having more than one result set open at the same time - return new CloseableIteratorIteration<BindingSet, SQLException>(Iterations.asList(it).iterator()); - } catch (InterruptedException | CancellationException e) { - log.info("SPARQL query execution cancelled"); - queryFuture.cancel(true); - queryStatement.cancel(); - queryStatement.close(); - - throw new InterruptedException("SPARQL query execution cancelled"); - } catch (ExecutionException e) { - log.error("error executing SPARQL query", e.getCause()); - if (e.getCause() instanceof SQLException) { - throw (SQLException) e.getCause(); - } else if (e.getCause() instanceof InterruptedException) { - throw (InterruptedException) e.getCause(); - } else { - throw new SQLException("error executing SPARQL query", e); - } - } - } catch (UnsatisfiableQueryException ex) { - return new EmptyIteration<>(); - } - } - - public KiWiDialect getDialect() { - return parent.getDialect(); - } -} http://git-wip-us.apache.org/repos/asf/marmotta/blob/89df9e03/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/sail/KiWiSparqlSail.java ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/sail/KiWiSparqlSail.java b/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/sail/KiWiSparqlSail.java index 4ce504e..ba19159 100644 --- a/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/sail/KiWiSparqlSail.java +++ b/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/sail/KiWiSparqlSail.java @@ -24,7 +24,6 @@ import org.apache.marmotta.kiwi.persistence.pgsql.PostgreSQLDialect; import org.apache.marmotta.kiwi.persistence.util.ScriptRunner; import org.apache.marmotta.kiwi.sail.KiWiSailConnection; import org.apache.marmotta.kiwi.sail.KiWiStore; -import org.apache.marmotta.kiwi.sparql.persistence.KiWiSparqlConnection; import org.openrdf.sail.*; import org.openrdf.sail.helpers.NotifyingSailWrapper; import org.openrdf.sail.helpers.SailConnectionWrapper; @@ -201,11 +200,7 @@ public class KiWiSparqlSail extends NotifyingSailWrapper { NotifyingSailConnection connection = super.getConnection(); KiWiSailConnection root = getRootConnection(connection); - try { - return new KiWiSparqlSailConnection(connection, new KiWiSparqlConnection(root.getDatabaseConnection(), root.getValueFactory()), root.getValueFactory()); - } catch (SQLException e) { - throw new SailException(e); - } + return new KiWiSparqlSailConnection(connection, root.getDatabaseConnection(), root.getValueFactory()); } http://git-wip-us.apache.org/repos/asf/marmotta/blob/89df9e03/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/sail/KiWiSparqlSailConnection.java ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/sail/KiWiSparqlSailConnection.java b/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/sail/KiWiSparqlSailConnection.java index 1006ebb..09333e7 100644 --- a/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/sail/KiWiSparqlSailConnection.java +++ b/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/sail/KiWiSparqlSailConnection.java @@ -18,13 +18,13 @@ package org.apache.marmotta.kiwi.sparql.sail; import info.aduna.iteration.CloseableIteration; +import org.apache.marmotta.kiwi.persistence.KiWiConnection; import org.apache.marmotta.kiwi.sail.KiWiValueFactory; import org.apache.marmotta.kiwi.sparql.evaluation.KiWiEvaluationStatistics; -import org.apache.marmotta.kiwi.sparql.evaluation.KiWiEvaluationStrategyImpl; +import org.apache.marmotta.kiwi.sparql.evaluation.KiWiEvaluationStrategy; import org.apache.marmotta.kiwi.sparql.evaluation.KiWiTripleSource; import org.apache.marmotta.kiwi.sparql.optimizer.DifferenceOptimizer; import org.apache.marmotta.kiwi.sparql.optimizer.DistinctLimitOptimizer; -import org.apache.marmotta.kiwi.sparql.persistence.KiWiSparqlConnection; import org.openrdf.query.BindingSet; import org.openrdf.query.Dataset; import org.openrdf.query.QueryEvaluationException; @@ -48,10 +48,10 @@ public class KiWiSparqlSailConnection extends NotifyingSailConnectionWrapper { private static Logger log = LoggerFactory.getLogger(KiWiSparqlSailConnection.class); - private KiWiSparqlConnection connection; + private KiWiConnection connection; private KiWiValueFactory valueFactory; - public KiWiSparqlSailConnection(NotifyingSailConnection parent, KiWiSparqlConnection connection, KiWiValueFactory valueFactory) { + public KiWiSparqlSailConnection(NotifyingSailConnection parent, KiWiConnection connection, KiWiValueFactory valueFactory) { super(parent); this.connection = connection; this.valueFactory = valueFactory; @@ -70,7 +70,7 @@ public class KiWiSparqlSailConnection extends NotifyingSailConnectionWrapper { try { KiWiTripleSource tripleSource = new KiWiTripleSource(this,valueFactory,includeInferred); - EvaluationStrategy strategy = new KiWiEvaluationStrategyImpl(tripleSource, dataset, connection); + EvaluationStrategy strategy = new KiWiEvaluationStrategy(tripleSource, dataset, connection, valueFactory); new BindingAssigner().optimize(tupleExpr, dataset, bindings); //new ConstantOptimizer(strategy).optimize(tupleExpr, dataset, bindings);
