http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c12f58f4/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecompJoinOptimizer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecompJoinOptimizer.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecompJoinOptimizer.java index 65a775f..eb0f042 100644 --- a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecompJoinOptimizer.java +++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecompJoinOptimizer.java @@ -8,9 +8,9 @@ package mvm.rya.indexing.external; * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -19,13 +19,11 @@ package mvm.rya.indexing.external; * under the License. */ - import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Set; import mvm.rya.api.RdfCloudTripleStoreConfiguration; @@ -38,25 +36,22 @@ import mvm.rya.indexing.accumulo.ConfigUtils; import mvm.rya.indexing.external.QueryVariableNormalizer.VarCollector; import mvm.rya.indexing.external.tupleSet.AccumuloIndexSet; import mvm.rya.indexing.external.tupleSet.ExternalTupleSet; +import mvm.rya.indexing.external.tupleSet.PcjTables; +import mvm.rya.indexing.external.tupleSet.PcjTables.PcjException; import mvm.rya.rdftriplestore.inference.DoNotExpandSP; import mvm.rya.rdftriplestore.utils.FixedStatementPattern; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.Authorizations; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Text; import org.openrdf.query.BindingSet; import org.openrdf.query.Dataset; import org.openrdf.query.MalformedQueryException; import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.algebra.BinaryTupleOperator; import org.openrdf.query.algebra.BindingSetAssignment; import org.openrdf.query.algebra.Difference; import org.openrdf.query.algebra.Distinct; @@ -86,688 +81,783 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -//optimizer which matches TupleExpressions associated with pre-computed queries -//to sub-queries of a given query. Each matched sub-query is replaced by an indexing node -//to delegate that portion of the query to the pre-computed query index +/** + * {@link QueryOptimizer} which matches TupleExpressions associated with + * pre-computed queries to sub-queries of a given query. Each matched sub-query + * is replaced by an indexing node to delegate that portion of the query to the + * pre-computed query index. + * <p> + * + * A query can be broken up into "Join segments", which subsets of the query + * joined only by {@link Join} nodes. Any portions of the query that are + * attached by {@link BinaryTupleOperator} or {@link UnaryTupleOperator} nodes other + * than a Join node mark the beginning of a new Join segment. Pre-computed query + * indices, or {@link ExternalTupleset} objects, are compared against the query + * nodes in each of its Join segments and replace any nodes which match the + * nodes in the ExternalTupleSet's TupleExpr. + * + */ + public class PrecompJoinOptimizer implements QueryOptimizer, Configurable { - private List<ExternalTupleSet> indexSet; - private Configuration conf; - private boolean init = false; - - public PrecompJoinOptimizer() { - } - - public PrecompJoinOptimizer(Configuration conf) { - this.conf = conf; - try { - indexSet = getAccIndices(conf); - init = true; - } catch (MalformedQueryException e) { - e.printStackTrace(); - } catch (SailException e) { - e.printStackTrace(); - } catch (QueryEvaluationException e) { - e.printStackTrace(); - } catch (TableNotFoundException e) { - e.printStackTrace(); - } catch (AccumuloException e) { - e.printStackTrace(); - } catch (AccumuloSecurityException e) { - e.printStackTrace(); - } - } - - public PrecompJoinOptimizer(List<ExternalTupleSet> indices, boolean useOptimalPcj) { - this.indexSet = indices; - conf = new Configuration(); - conf.setBoolean(ConfigUtils.USE_OPTIMAL_PCJ, useOptimalPcj); - } - - public void setConf(Configuration conf) { - this.conf = conf; - if (!init) { - try { - indexSet = getAccIndices(conf); - init = true; - } catch (MalformedQueryException e) { - e.printStackTrace(); - } catch (SailException e) { - e.printStackTrace(); - } catch (QueryEvaluationException e) { - e.printStackTrace(); - } catch (TableNotFoundException e) { - e.printStackTrace(); - } catch (AccumuloException e) { - e.printStackTrace(); - } catch (AccumuloSecurityException e) { - e.printStackTrace(); - } - } - } - - @Override - public Configuration getConf() { - return conf; - } - - - @Override - public void optimize(TupleExpr tupleExpr, Dataset dataset, BindingSet bindings) { - - IndexedExecutionPlanGenerator iep = new IndexedExecutionPlanGenerator(tupleExpr, indexSet); - JoinVisitor jv = new JoinVisitor(); - - if (ConfigUtils.getUseOptimalPCJ(conf) && indexSet.size() > 0) { - - //get potential relevant index combinations - ValidIndexCombinationGenerator vic = new ValidIndexCombinationGenerator(tupleExpr); - Iterator<List<ExternalTupleSet>> iter = vic.getValidIndexCombos(iep.getNormalizedIndices()); - TupleExpr bestTup = null; - TupleExpr tempTup = null; - double tempCost = 0; - double minCost = Double.MAX_VALUE; - - while (iter.hasNext()) { - //apply join visitor to place external index nodes in query - TupleExpr clone = tupleExpr.clone(); - jv.setExternalTupList(iter.next()); - jv.setSegmentFilters(new ArrayList<Filter>()); - clone.visit(jv); - - //get all valid execution plans for given external index combination by considering all - //permutations of nodes in TupleExpr - IndexPlanValidator ipv = new IndexPlanValidator(false); - Iterator<TupleExpr> validTups = ipv.getValidTuples(TupleReArranger.getTupleReOrderings(clone).iterator()); - - //set valid plan according to a specified cost threshold, where cost depends on specified weights - //for number of external index nodes, common variables among joins in execution plan, and number of - //external products in execution plan - ThreshholdPlanSelector tps = new ThreshholdPlanSelector(tupleExpr); - tempTup = tps.getThreshholdQueryPlan(validTups, .4, .5, .2, .3); - - //choose best threshhold TupleExpr among all index node combinations - tempCost = tps.getCost(tempTup, .5, .2, .3); - if(tempCost < minCost ) { - minCost = tempCost; - bestTup = tempTup; - } - } - if (bestTup != null) { - ((UnaryTupleOperator) tupleExpr).setArg(((UnaryTupleOperator) bestTup).getArg()); - } - return; - } else { - if (indexSet.size() > 0) { - jv.setExternalTupList(iep.getNormalizedIndices()); - tupleExpr.visit(jv); - } - return; - } - } - - protected class JoinVisitor extends QueryModelVisitorBase<RuntimeException> { - - private List<ExternalTupleSet> tupList; - private List<Filter> segmentFilters = Lists.newArrayList(); - - public void setExternalTupList(List<ExternalTupleSet> tupList) { - this.tupList = tupList; - } - - public void setSegmentFilters(List<Filter> segmentFilters) { - this.segmentFilters = segmentFilters; - } - - @Override - public void meet(Join node) { - - //get all filters with bindings in this segment - updateFilters(segmentFilters, true); - - try { - if (node.getLeftArg() instanceof FixedStatementPattern && node.getRightArg() instanceof DoNotExpandSP) { - return; - } - - //get nodes in this join segment - TupleExpr newJoin = null; - List<QueryModelNode> args = getJoinArgs(node, new ArrayList<QueryModelNode>(), false); - List<TupleExpr> joinArgs = Lists.newArrayList(); - - for (QueryModelNode qNode : args) { - assert (qNode instanceof TupleExpr); - joinArgs.add((TupleExpr) qNode); - } - - //insert all matching ExternalTupleSets in tupList into this segment - joinArgs = matchExternalTupleSets(joinArgs, tupList); - - //push down any filters that have bindings in lower segments - //and update the filters in this segment - updateFilters(segmentFilters, false); - - //form join from matching ExternalTupleSets, remaining nodes, and filters - //that can't be pushed down any further - newJoin = getNewJoin(joinArgs, getFilterChain(segmentFilters)); - - // Replace old join hierarchy - node.replaceWith(newJoin); - - //visit remaining nodes to match ExternalTupleSets with nodes further down - for (TupleExpr te : joinArgs) { - if (!(te instanceof StatementPattern) && !(te instanceof ExternalTupleSet)) { - segmentFilters = Lists.newArrayList(); - te.visit(this); - } - } - - } catch (Exception e) { - e.printStackTrace(); - } - } - - - @Override - public void meet(Filter node) { - segmentFilters.add(node); - node.getArg().visit(this); - } - - //chain filters together and return front and back of chain - private List<TupleExpr> getFilterChain(List<Filter> filters) { - List<TupleExpr> filterTopBottom = Lists.newArrayList(); - Filter filterChainTop = null; - Filter filterChainBottom = null; - - for (Filter filter: filters) { - if (filterChainTop == null) { - filterChainTop = filter; - } else if (filterChainBottom == null) { - filterChainBottom = filter; - filterChainTop.setArg(filterChainBottom); - } else { - filterChainBottom.setArg(filter); - filterChainBottom = filter; - } - } - if(filterChainTop != null) { - filterTopBottom.add(filterChainTop); - } - if(filterChainBottom != null) { - filterTopBottom.add(filterChainBottom); - } - return filterTopBottom; - } - - //build newJoin node given remaining joinArgs and chain of filters - private TupleExpr getNewJoin(List<TupleExpr> args, List<TupleExpr> filterChain) { - TupleExpr newJoin; - List<TupleExpr> joinArgs = Lists.newArrayList(args); - - if (joinArgs.size() > 1) { - if (filterChain.size() > 0) { - TupleExpr finalJoinArg = joinArgs.remove(0); - TupleExpr tempJoin; - TupleExpr temp = filterChain.get(0); - - if (joinArgs.size() > 1) { - tempJoin = new Join(joinArgs.remove(0), joinArgs.remove(0)); - for (TupleExpr te : joinArgs) { - tempJoin = new Join(tempJoin, te); - } - } else { - tempJoin = joinArgs.remove(0); - } - - if (filterChain.size() == 1) { - ((Filter) temp).setArg(tempJoin); - } else { - ((Filter) filterChain.get(1)).setArg(tempJoin); - } - newJoin = new Join(temp, finalJoinArg); - } else { - newJoin = new Join(joinArgs.get(0), joinArgs.get(1)); - joinArgs.remove(0); - joinArgs.remove(0); - - for (TupleExpr te : joinArgs) { - newJoin = new Join(newJoin, te); - } - } - } else if (joinArgs.size() == 1) { - if (filterChain.size() > 0) { - newJoin = filterChain.get(0); - if (filterChain.size() == 1) { - ((Filter) newJoin).setArg(joinArgs.get(0)); - } else { - ((Filter) filterChain.get(1)).setArg(joinArgs.get(0)); - } - } else { - newJoin = joinArgs.get(0); - } - } else { - throw new IllegalStateException("JoinArgs size cannot be zero."); - } - return newJoin; - } - - - private List<TupleExpr> matchExternalTupleSets(List<TupleExpr> joinArgs, List<ExternalTupleSet> tupList) { - - Set<QueryModelNode> argSet = Sets.newHashSet(); - argSet.addAll(joinArgs); - - if(argSet.size() < joinArgs.size()) { - throw new IllegalArgumentException("Query has duplicate nodes in segment!"); - } - - Set<QueryModelNode> firstJoinFilterCond = Sets.newHashSet(); - - for(Filter filter: segmentFilters) { - firstJoinFilterCond.add(filter.getCondition()); - } - - argSet.addAll(firstJoinFilterCond); - - //see if ExternalTupleSet nodes are a subset of joinArgs, and if so, replacing matching nodes - //with ExternalTupleSet - for (ExternalTupleSet tup : tupList) { - TupleExpr tupleArg = tup.getTupleExpr(); - if (isTupleValid(tupleArg)) { - List<QueryModelNode> tupJoinArgs = getJoinArgs(tupleArg, - new ArrayList<QueryModelNode>(), true); - Set<QueryModelNode> tupJoinArgSet = Sets.newHashSet(tupJoinArgs); - if(tupJoinArgSet.size() < tupJoinArgs.size()) { - throw new IllegalArgumentException("ExternalTuple contains duplicate nodes!"); - } - if (argSet.containsAll(tupJoinArgSet)) { - argSet = Sets.newHashSet(Sets.difference(argSet, tupJoinArgSet)); - argSet.add((ExternalTupleSet) tup.clone()); - } - } - } - - //update segment filters by removing those use in ExternalTupleSet - Iterator<Filter> iter = segmentFilters.iterator(); - - while(iter.hasNext()) { - Filter filt = iter.next(); - if(!argSet.contains(filt.getCondition())) { - filt.replaceWith(filt.getArg()); - iter.remove(); - } - } - - //update joinArgs - joinArgs = Lists.newArrayList(); - for(QueryModelNode node: argSet) { - if(!(node instanceof ValueExpr)) { - joinArgs.add((TupleExpr)node); - } - } - - return joinArgs; - } - - - private void updateFilters(List<Filter> filters, boolean firstJoin) { - - Iterator<Filter> iter = segmentFilters.iterator(); - - while (iter.hasNext()) { - if (!FilterRelocator.relocate(iter.next(), firstJoin)) { - iter.remove(); - } - } - } - - protected List<QueryModelNode> getJoinArgs(TupleExpr tupleExpr, List<QueryModelNode> joinArgs, boolean getFilters) { - if (tupleExpr instanceof Join) { - if (!(((Join) tupleExpr).getLeftArg() instanceof FixedStatementPattern) - && !(((Join) tupleExpr).getRightArg() instanceof DoNotExpandSP)) { - Join join = (Join) tupleExpr; - getJoinArgs(join.getLeftArg(), joinArgs, getFilters); - getJoinArgs(join.getRightArg(), joinArgs, getFilters); - } - } else if(tupleExpr instanceof Filter) { - if (getFilters) { - joinArgs.add(((Filter) tupleExpr).getCondition()); - } - getJoinArgs(((Filter)tupleExpr).getArg(), joinArgs, getFilters); - } else if(tupleExpr instanceof Projection) { - getJoinArgs(((Projection)tupleExpr).getArg(), joinArgs, getFilters); - } else { - joinArgs.add(tupleExpr); - } - - return joinArgs; - } - } - - protected static class FilterRelocator extends QueryModelVisitorBase<RuntimeException> { - - - protected final Filter filter; - - protected final Set<String> filterVars; - private boolean stopAtFirstJoin = false; - private boolean isFirstJoinFilter = false; - private boolean inSegment = true; - - - public FilterRelocator(Filter filter) { - this.filter = filter; - filterVars = VarNameCollector.process(filter.getCondition()); - } - - public FilterRelocator(Filter filter, boolean stopAtFirstJoin) { - this.filter = filter; - filterVars = VarNameCollector.process(filter.getCondition()); - this.stopAtFirstJoin = stopAtFirstJoin; - } - - public static boolean relocate(Filter filter) { - FilterRelocator fr = new FilterRelocator(filter); - filter.visit(fr); - return fr.inSegment; - } - - public static boolean relocate(Filter filter, boolean stopAtFirstJoin) { - if (stopAtFirstJoin) { - FilterRelocator fr = new FilterRelocator(filter, stopAtFirstJoin); - filter.visit(fr); - return fr.isFirstJoinFilter; - } else { - FilterRelocator fr = new FilterRelocator(filter); - filter.visit(fr); - return fr.inSegment; - } - } - - - @Override - protected void meetNode(QueryModelNode node) { - // By default, do not traverse - assert node instanceof TupleExpr; - - if(node instanceof UnaryTupleOperator) { - if (((UnaryTupleOperator)node).getArg().getBindingNames().containsAll(filterVars)) { - if (stopAtFirstJoin) { - ((UnaryTupleOperator) node).getArg().visit(this); - } else { - inSegment = false; - relocate(filter, ((UnaryTupleOperator) node).getArg()); - } - } - } - - relocate(filter, (TupleExpr) node); - } - - - @Override - public void meet(Join join) { - - if (stopAtFirstJoin) { - isFirstJoinFilter = true; - relocate(filter, join); - } else { - - if (join.getLeftArg().getBindingNames().containsAll(filterVars)) { - // All required vars are bound by the left expr - join.getLeftArg().visit(this); - } else if (join.getRightArg().getBindingNames().containsAll(filterVars)) { - // All required vars are bound by the right expr - join.getRightArg().visit(this); - } else { - relocate(filter, join); - } - } - } - - @Override - public void meet(LeftJoin leftJoin) { - - if (leftJoin.getLeftArg().getBindingNames().containsAll(filterVars)) { - inSegment = false; - if (stopAtFirstJoin) { - leftJoin.getLeftArg().visit(this); - } else { - relocate(filter, leftJoin.getLeftArg()); - } - } - else { - relocate(filter, leftJoin); - } - } - - @Override - public void meet(Union union) { - Filter clone = new Filter(); - clone.setCondition(filter.getCondition().clone()); - - relocate(filter, union.getLeftArg()); - relocate(clone, union.getRightArg()); - - inSegment = false; - - } - - @Override - public void meet(Difference node) { - Filter clone = new Filter(); - clone.setCondition(filter.getCondition().clone()); - - relocate(filter, node.getLeftArg()); - relocate(clone, node.getRightArg()); - - inSegment = false; - - } - - @Override - public void meet(Intersection node) { - Filter clone = new Filter(); - clone.setCondition(filter.getCondition().clone()); - - relocate(filter, node.getLeftArg()); - relocate(clone, node.getRightArg()); - - inSegment = false; - - } - - @Override - public void meet(Extension node) { - if (node.getArg().getBindingNames().containsAll(filterVars)) { - if (stopAtFirstJoin) { - node.getArg().visit(this); - } else { - relocate(filter, node.getArg()); - inSegment = false; - } - } - else { - relocate(filter, node); - } - } - - @Override - public void meet(EmptySet node) { - if (filter.getParentNode() != null) { - // Remove filter from its original location - filter.replaceWith(filter.getArg()); - } - } - - @Override - public void meet(Filter filter) { - // Filters are commutative - filter.getArg().visit(this); - } - - @Override - public void meet(Distinct node) { - node.getArg().visit(this); - } - - @Override - public void meet(Order node) { - node.getArg().visit(this); - } - - @Override - public void meet(QueryRoot node) { - node.getArg().visit(this); - } - - @Override - public void meet(Reduced node) { - node.getArg().visit(this); - } - - protected void relocate(Filter filter, TupleExpr newFilterArg) { - if (filter.getArg() != newFilterArg) { - if (filter.getParentNode() != null) { - // Remove filter from its original location - filter.replaceWith(filter.getArg()); - } - - // Insert filter at the new location - newFilterArg.replaceWith(filter); - filter.setArg(newFilterArg); - } - } - } - - - private static boolean isTupleValid(QueryModelNode node) { - - ValidQueryVisitor vqv = new ValidQueryVisitor(); - node.visit(vqv); - - if (vqv.isValid() && vqv.getSPs().size() > 1) { - if(vqv.getFilters().size() > 0) { - Set<String> spVars = getVarNames(vqv.getSPs()); - Set<String> fVarNames = getVarNames(vqv.getFilters()); - //check that all vars contained in filters also occur in SPs - return Sets.intersection(fVarNames,spVars).equals(fVarNames); - } else { - return true; - } - } else { - return false; - } - } - - - private static Set<String> getVarNames(Collection<QueryModelNode> nodes) { - - List<String> tempVars; - Set<String> nodeVarNames = Sets.newHashSet(); - - for (QueryModelNode s : nodes) { - tempVars = VarCollector.process(s); - for (String t : tempVars) - nodeVarNames.add(t); - } - return nodeVarNames; - } - - - private static class ValidQueryVisitor extends QueryModelVisitorBase<RuntimeException> { - - private boolean isValid = true; - private Set<QueryModelNode> filterSet = Sets.newHashSet(); - private Set<QueryModelNode> spSet = Sets.newHashSet(); - - public Set<QueryModelNode> getFilters() { - return filterSet; - } - - public Set<QueryModelNode> getSPs() { - return spSet; - } - - public boolean isValid() { - return isValid; - } - - public void meet(Projection node) { - node.getArg().visit(this); - } - - @Override - public void meet(Filter node) { - filterSet.add(node.getCondition()); - node.getArg().visit(this); - } - - @Override - public void meet(StatementPattern node) { - spSet.add(node); - } - - public void meetNode(QueryModelNode node) { - - if (!((node instanceof Join) || (node instanceof StatementPattern) || (node instanceof BindingSetAssignment) || - (node instanceof Var) || (node instanceof Union) || (node instanceof LeftJoin))) { - isValid = false; - return; - - } else{ - super.meetNode(node); - } - } - - } - - - private static List<ExternalTupleSet> getAccIndices(Configuration conf) throws MalformedQueryException, - SailException, QueryEvaluationException, TableNotFoundException, AccumuloException, - AccumuloSecurityException { - - List<String> tables = null; - - if (conf instanceof RdfCloudTripleStoreConfiguration) { - tables = ((RdfCloudTripleStoreConfiguration) conf).getPcjTables(); - } - - String tablePrefix = conf.get(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX); - Connector c = ConfigUtils.getConnector(conf); - Map<String, String> indexTables = Maps.newLinkedHashMap(); - - if (tables != null && !tables.isEmpty()) { - for (String table : tables) { - Scanner s = c.createScanner(table, new Authorizations()); - s.setRange(Range.exact(new Text("~SPARQL"))); - for (Entry<Key, Value> e : s) { - indexTables.put(table, e.getValue().toString()); - } - } - } else { - for (String table : c.tableOperations().list()) { - if (table.startsWith(tablePrefix + "INDEX")) { - Scanner s = c.createScanner(table, new Authorizations()); - s.setRange(Range.exact(new Text("~SPARQL"))); - for (Entry<Key, Value> e : s) { - indexTables.put(table, e.getValue().toString()); - } - } - } - - } - List<ExternalTupleSet> index = Lists.newArrayList(); - - if (indexTables.isEmpty()) { - System.out.println("No Index found"); - } else { - for (String table : indexTables.keySet()) { - String indexSparqlString = indexTables.get(table); - index.add(new AccumuloIndexSet(indexSparqlString, c, table)); - } - } - return index; - } + private List<ExternalTupleSet> indexSet; + private Configuration conf; + private boolean init = false; + + public PrecompJoinOptimizer() { + } + + public PrecompJoinOptimizer(Configuration conf) { + this.conf = conf; + try { + indexSet = getAccIndices(conf); + } catch (MalformedQueryException | SailException + | QueryEvaluationException | TableNotFoundException + | AccumuloException | AccumuloSecurityException | PcjException e) { + e.printStackTrace(); + } + init = true; + } + + public PrecompJoinOptimizer(List<ExternalTupleSet> indices, + boolean useOptimalPcj) { + this.indexSet = indices; + conf = new Configuration(); + conf.setBoolean(ConfigUtils.USE_OPTIMAL_PCJ, useOptimalPcj); + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + if (!init) { + try { + indexSet = getAccIndices(conf); + } catch (MalformedQueryException | SailException + | QueryEvaluationException | TableNotFoundException + | AccumuloException | AccumuloSecurityException + | PcjException e) { + e.printStackTrace(); + } + init = true; + } + } + + @Override + public Configuration getConf() { + return conf; + } + + /** + * @param tupleExpr + * -- query whose query plan will be optimized -- specified + * ExternalTupleSet nodes contained in will be placed in query + * plan where an ExternalTupleSet TupleExpr matches the query's + * sub-query + */ + @Override + public void optimize(TupleExpr tupleExpr, Dataset dataset, + BindingSet bindings) { + + final IndexedExecutionPlanGenerator iep = new IndexedExecutionPlanGenerator( + tupleExpr, indexSet); + final JoinVisitor jv = new JoinVisitor(); + + if (ConfigUtils.getUseOptimalPCJ(conf) && indexSet.size() > 0) { + + // get potential relevant index combinations + final ValidIndexCombinationGenerator vic = new ValidIndexCombinationGenerator( + tupleExpr); + final Iterator<List<ExternalTupleSet>> iter = vic + .getValidIndexCombos(iep.getNormalizedIndices()); + TupleExpr bestTup = null; + TupleExpr tempTup = null; + double tempCost = 0; + double minCost = Double.MAX_VALUE; + + while (iter.hasNext()) { + // apply join visitor to place external index nodes in query + final TupleExpr clone = tupleExpr.clone(); + jv.setExternalTupList(iter.next()); + jv.setSegmentFilters(new ArrayList<Filter>()); + clone.visit(jv); + + // get all valid execution plans for given external index + // combination by considering all + // permutations of nodes in TupleExpr + final IndexPlanValidator ipv = new IndexPlanValidator(false); + final Iterator<TupleExpr> validTups = ipv + .getValidTuples(TupleReArranger.getTupleReOrderings( + clone).iterator()); + + // set valid plan according to a specified cost threshold, where + // cost depends on specified weights + // for number of external index nodes, common variables among + // joins in execution plan, and number of + // external products in execution plan + final ThreshholdPlanSelector tps = new ThreshholdPlanSelector( + tupleExpr); + tempTup = tps.getThreshholdQueryPlan(validTups, .4, .5, .2, .3); + + // choose best threshhold TupleExpr among all index node + // combinations + tempCost = tps.getCost(tempTup, .5, .2, .3); + if (tempCost < minCost) { + minCost = tempCost; + bestTup = tempTup; + } + } + if (bestTup != null) { + ((UnaryTupleOperator) tupleExpr) + .setArg(((UnaryTupleOperator) bestTup).getArg()); + } + return; + } else { + if (indexSet.size() > 0) { + jv.setExternalTupList(iep.getNormalizedIndices()); + tupleExpr.visit(jv); + } + return; + } + } + + /** + * Given a list of @ ExternalTuleSet} , this visitor navigates the query + * {@link TupleExpr} specified in the + * {@link PrecompJoinOptimizer#optimize(TupleExpr, Dataset, BindingSet) and + * matches the TupleExpr in the ExternalTupleSet with sub-queries of the + * query and replaces the sub-query with the ExternalTupleSet node. + * + */ + protected class JoinVisitor extends QueryModelVisitorBase<RuntimeException> { + + private List<ExternalTupleSet> tupList; + private List<Filter> segmentFilters = Lists.newArrayList(); + + public void setExternalTupList(List<ExternalTupleSet> tupList) { + this.tupList = tupList; + } + + public void setSegmentFilters(List<Filter> segmentFilters) { + this.segmentFilters = segmentFilters; + } + + @Override + public void meet(Join node) { + + // get all filters with bindings in this segment + updateFilters(segmentFilters, true); + + try { + if (node.getLeftArg() instanceof FixedStatementPattern + && node.getRightArg() instanceof DoNotExpandSP) { + return; + } + + // get nodes in this join segment + TupleExpr newJoin = null; + final List<QueryModelNode> args = getJoinArgs(node, + new ArrayList<QueryModelNode>(), false); + List<TupleExpr> joinArgs = Lists.newArrayList(); + + for (final QueryModelNode qNode : args) { + assert qNode instanceof TupleExpr; + joinArgs.add((TupleExpr) qNode); + } + + // insert all matching ExternalTupleSets in tupList into this + // segment + joinArgs = matchExternalTupleSets(joinArgs, tupList); + + // push down any filters that have bindings in lower segments + // and update the filters in this segment + updateFilters(segmentFilters, false); + + // form join from matching ExternalTupleSets, remaining nodes, + // and filters + // that can't be pushed down any further + newJoin = getNewJoin(joinArgs, getFilterChain(segmentFilters)); + + // Replace old join hierarchy + node.replaceWith(newJoin); + + // visit remaining nodes to match ExternalTupleSets with nodes + // further down + for (final TupleExpr te : joinArgs) { + if (!(te instanceof StatementPattern) + && !(te instanceof ExternalTupleSet)) { + segmentFilters = Lists.newArrayList(); + te.visit(this); + } + } + + } catch (final Exception e) { + e.printStackTrace(); + } + } + + @Override + public void meet(Filter node) { + segmentFilters.add(node); + node.getArg().visit(this); + } + + // chain filters together and return front and back of chain + private List<TupleExpr> getFilterChain(List<Filter> filters) { + final List<TupleExpr> filterTopBottom = Lists.newArrayList(); + Filter filterChainTop = null; + Filter filterChainBottom = null; + + for (final Filter filter : filters) { + filter.replaceWith(filter.getArg()); + if (filterChainTop == null) { + filterChainTop = filter; + } else if (filterChainBottom == null) { + filterChainBottom = filter; + filterChainTop.setArg(filterChainBottom); + } else { + filterChainBottom.setArg(filter); + filterChainBottom = filter; + } + } + if (filterChainTop != null) { + filterTopBottom.add(filterChainTop); + } + if (filterChainBottom != null) { + filterTopBottom.add(filterChainBottom); + } + return filterTopBottom; + } + + // build newJoin node given remaining joinArgs and chain of filters + private TupleExpr getNewJoin(List<TupleExpr> args, + List<TupleExpr> filterChain) { + TupleExpr newJoin; + final List<TupleExpr> joinArgs = Lists.newArrayList(args); + + if (joinArgs.size() > 1) { + if (filterChain.size() > 0) { + final TupleExpr finalJoinArg = joinArgs.remove(0); + TupleExpr tempJoin; + final TupleExpr temp = filterChain.get(0); + + if (joinArgs.size() > 1) { + tempJoin = new Join(joinArgs.remove(0), + joinArgs.remove(0)); + for (final TupleExpr te : joinArgs) { + tempJoin = new Join(tempJoin, te); + } + } else { + tempJoin = joinArgs.remove(0); + } + + if (filterChain.size() == 1) { + ((Filter) temp).setArg(tempJoin); + } else { + ((Filter) filterChain.get(1)).setArg(tempJoin); + } + newJoin = new Join(temp, finalJoinArg); + } else { + newJoin = new Join(joinArgs.get(0), joinArgs.get(1)); + joinArgs.remove(0); + joinArgs.remove(0); + + for (final TupleExpr te : joinArgs) { + newJoin = new Join(newJoin, te); + } + } + } else if (joinArgs.size() == 1) { + if (filterChain.size() > 0) { + newJoin = filterChain.get(0); + if (filterChain.size() == 1) { + ((Filter) newJoin).setArg(joinArgs.get(0)); + } else { + ((Filter) filterChain.get(1)).setArg(joinArgs.get(0)); + } + } else { + newJoin = joinArgs.get(0); + } + } else { + throw new IllegalStateException("JoinArgs size cannot be zero."); + } + return newJoin; + } + + /** + * + * @param joinArgs + * -- list of non-join nodes contained in the join segment + * @param tupList + * -- list of indices to match sub-queries in this join + * segment + * @return updated list of non-join nodes, where any nodes matching an + * index are replaced by that index + */ + private List<TupleExpr> matchExternalTupleSets( + List<TupleExpr> joinArgs, List<ExternalTupleSet> tupList) { + + List<TupleExpr> bsaList = new ArrayList<>(); + Set<QueryModelNode> argSet = Sets.newHashSet(); + for (TupleExpr te : joinArgs) { + if (te instanceof BindingSetAssignment) { + bsaList.add(te); + } else { + argSet.add(te); + } + } + + if (argSet.size() + bsaList.size() < joinArgs.size()) { + throw new IllegalArgumentException( + "Query has duplicate nodes in segment!"); + } + + final Set<QueryModelNode> firstJoinFilterCond = Sets.newHashSet(); + + for (final Filter filter : segmentFilters) { + firstJoinFilterCond.add(filter.getCondition()); + } + + argSet.addAll(firstJoinFilterCond); + + // see if ExternalTupleSet nodes are a subset of joinArgs, and if + // so, replacing matching nodes + // with ExternalTupleSet + for (final ExternalTupleSet tup : tupList) { + final TupleExpr tupleArg = tup.getTupleExpr(); + if (isTupleValid(tupleArg)) { + final List<QueryModelNode> tupJoinArgs = getJoinArgs( + tupleArg, new ArrayList<QueryModelNode>(), true); + final Set<QueryModelNode> tupJoinArgSet = Sets + .newHashSet(tupJoinArgs); + if (tupJoinArgSet.size() < tupJoinArgs.size()) { + throw new IllegalArgumentException( + "ExternalTuple contains duplicate nodes!"); + } + if (argSet.containsAll(tupJoinArgSet)) { + argSet = Sets.newHashSet(Sets.difference(argSet, + tupJoinArgSet)); + argSet.add(tup.clone()); + } + } + } + + // update segment filters by removing those use in ExternalTupleSet + final Iterator<Filter> iter = segmentFilters.iterator(); + + while (iter.hasNext()) { + final Filter filt = iter.next(); + if (!argSet.contains(filt.getCondition())) { + filt.replaceWith(filt.getArg()); + iter.remove(); + } + } + + // update joinArgs + joinArgs = Lists.newArrayList(); + for (final QueryModelNode node : argSet) { + if (!(node instanceof ValueExpr)) { + joinArgs.add((TupleExpr) node); + } + } + joinArgs.addAll(bsaList); + + return joinArgs; + } + + private void updateFilters(List<Filter> filters, boolean firstJoin) { + + final Iterator<Filter> iter = segmentFilters.iterator(); + + while (iter.hasNext()) { + if (!FilterRelocator.relocate(iter.next(), firstJoin)) { + iter.remove(); + } + } + } + + /** + * + * @param tupleExpr + * -- the query + * @param joinArgs + * -- the non-join nodes contained in the join segment + * @param getFilters + * -- the filters contained in the query + * @return -- the non-join nodes contained in the join segment + */ + protected List<QueryModelNode> getJoinArgs(TupleExpr tupleExpr, + List<QueryModelNode> joinArgs, boolean getFilters) { + if (tupleExpr instanceof Join) { + if (!(((Join) tupleExpr).getLeftArg() instanceof FixedStatementPattern) + && !(((Join) tupleExpr).getRightArg() instanceof DoNotExpandSP)) { + final Join join = (Join) tupleExpr; + getJoinArgs(join.getLeftArg(), joinArgs, getFilters); + getJoinArgs(join.getRightArg(), joinArgs, getFilters); + } + } else if (tupleExpr instanceof Filter) { + if (getFilters) { + joinArgs.add(((Filter) tupleExpr).getCondition()); + } + getJoinArgs(((Filter) tupleExpr).getArg(), joinArgs, getFilters); + } else if (tupleExpr instanceof Projection) { + getJoinArgs(((Projection) tupleExpr).getArg(), joinArgs, + getFilters); + } else { + joinArgs.add(tupleExpr); + } + + return joinArgs; + } + } + + /** + * Relocates filters based on the binding variables contained in the + * {@link Filter}. If you don't specify the FilterRelocator to stop at the + * first {@link Join}, the relocator pushes the filter as far down the query + * plan as possible, checking if the nodes below contain its binding + * variables. If stopAtFirstJoin = true, the Filter is inserted at the first + * Join node encountered. The relocator tracks whether the node stays in the + * join segment or is inserted outside of the Join segment and returns true + * if the Filter stays in the segment and false otherwise. + * + */ + + protected static class FilterRelocator extends + QueryModelVisitorBase<RuntimeException> { + + protected final Filter filter; + + protected final Set<String> filterVars; + private boolean stopAtFirstJoin = false; + private boolean isFirstJoinFilter = false; + private boolean inSegment = true; + + public FilterRelocator(Filter filter) { + this.filter = filter; + filterVars = VarNameCollector.process(filter.getCondition()); + } + + public FilterRelocator(Filter filter, boolean stopAtFirstJoin) { + this.filter = filter; + filterVars = VarNameCollector.process(filter.getCondition()); + this.stopAtFirstJoin = stopAtFirstJoin; + } + + public static boolean relocate(Filter filter) { + final FilterRelocator fr = new FilterRelocator(filter); + filter.visit(fr); + return fr.inSegment; + } + + public static boolean relocate(Filter filter, boolean stopAtFirstJoin) { + if (stopAtFirstJoin) { + final FilterRelocator fr = new FilterRelocator(filter, + stopAtFirstJoin); + filter.visit(fr); + return fr.isFirstJoinFilter; + } else { + final FilterRelocator fr = new FilterRelocator(filter); + filter.visit(fr); + return fr.inSegment; + } + } + + @Override + protected void meetNode(QueryModelNode node) { + // By default, do not traverse + assert node instanceof TupleExpr; + + if (node instanceof UnaryTupleOperator) { + if (((UnaryTupleOperator) node).getArg().getBindingNames() + .containsAll(filterVars)) { + if (stopAtFirstJoin) { + ((UnaryTupleOperator) node).getArg().visit(this); + } else { + inSegment = false; + relocate(filter, ((UnaryTupleOperator) node).getArg()); + } + } + } + + relocate(filter, (TupleExpr) node); + } + + @Override + public void meet(Join join) { + + if (stopAtFirstJoin) { + isFirstJoinFilter = true; + relocate(filter, join); + } else { + + if (join.getLeftArg().getBindingNames().containsAll(filterVars)) { + // All required vars are bound by the left expr + join.getLeftArg().visit(this); + } else if (join.getRightArg().getBindingNames() + .containsAll(filterVars)) { + // All required vars are bound by the right expr + join.getRightArg().visit(this); + } else { + relocate(filter, join); + } + } + } + + @Override + public void meet(LeftJoin leftJoin) { + + if (leftJoin.getLeftArg().getBindingNames().containsAll(filterVars)) { + inSegment = false; + if (stopAtFirstJoin) { + leftJoin.getLeftArg().visit(this); + } else { + relocate(filter, leftJoin.getLeftArg()); + } + } else { + relocate(filter, leftJoin); + } + } + + @Override + public void meet(Union union) { + final Filter clone = new Filter(); + clone.setCondition(filter.getCondition().clone()); + + relocate(filter, union.getLeftArg()); + relocate(clone, union.getRightArg()); + + inSegment = false; + + } + + @Override + public void meet(Difference node) { + final Filter clone = new Filter(); + clone.setCondition(filter.getCondition().clone()); + + relocate(filter, node.getLeftArg()); + relocate(clone, node.getRightArg()); + + inSegment = false; + + } + + @Override + public void meet(Intersection node) { + final Filter clone = new Filter(); + clone.setCondition(filter.getCondition().clone()); + + relocate(filter, node.getLeftArg()); + relocate(clone, node.getRightArg()); + + inSegment = false; + + } + + @Override + public void meet(Extension node) { + if (node.getArg().getBindingNames().containsAll(filterVars)) { + if (stopAtFirstJoin) { + node.getArg().visit(this); + } else { + relocate(filter, node.getArg()); + inSegment = false; + } + } else { + relocate(filter, node); + } + } + + @Override + public void meet(EmptySet node) { + if (filter.getParentNode() != null) { + // Remove filter from its original location + filter.replaceWith(filter.getArg()); + } + } + + @Override + public void meet(Filter filter) { + // Filters are commutative + filter.getArg().visit(this); + } + + @Override + public void meet(Distinct node) { + node.getArg().visit(this); + } + + @Override + public void meet(Order node) { + node.getArg().visit(this); + } + + @Override + public void meet(QueryRoot node) { + node.getArg().visit(this); + } + + @Override + public void meet(Reduced node) { + node.getArg().visit(this); + } + + protected void relocate(Filter filter, TupleExpr newFilterArg) { + if (filter.getArg() != newFilterArg) { + if (filter.getParentNode() != null) { + // Remove filter from its original location + filter.replaceWith(filter.getArg()); + } + + // Insert filter at the new location + newFilterArg.replaceWith(filter); + filter.setArg(newFilterArg); + } + } + } + + /** + * This method determines whether an index node is valid. Criteria for a + * valid node are that is have two or more {@link StatementPattern} nodes or + * at least one {@link Filter} and one StatementPattern node. Additionally, + * the number of variables in the Filter cannot exceed the number of + * variables among all non-Filter nodes in the TupleExpr. Also, this method + * calls the {@link ValidQueryVisitor} to determine if the + * ExternalTupleSet's TupleExpr contains an invalid node type. + * + * @param node + * -- typically an {@link ExternalTupleSet} index node + * @return + */ + private static boolean isTupleValid(QueryModelNode node) { + + final ValidQueryVisitor vqv = new ValidQueryVisitor(); + node.visit(vqv); + + if (vqv.isValid() && vqv.getSPs().size() + vqv.getFilters().size() > 1) { + if (vqv.getFilters().size() > 0) { + final Set<String> spVars = getVarNames(vqv.getSPs()); + final Set<String> fVarNames = getVarNames(vqv.getFilters()); + // check that all vars contained in filters also occur in SPs + return Sets.intersection(fVarNames, spVars).equals(fVarNames); + } else { + return true; + } + } else { + return false; + } + } + + private static Set<String> getVarNames(Collection<QueryModelNode> nodes) { + + List<String> tempVars; + final Set<String> nodeVarNames = Sets.newHashSet(); + + for (final QueryModelNode s : nodes) { + tempVars = VarCollector.process(s); + for (final String t : tempVars) { + nodeVarNames.add(t); + } + } + return nodeVarNames; + } + + /** + * A visitor which checks a TupleExpr associated with an ExternalTupleSet to + * determine whether the TupleExpr contains an invalid node. + * + */ + private static class ValidQueryVisitor extends + QueryModelVisitorBase<RuntimeException> { + + private boolean isValid = true; + private final Set<QueryModelNode> filterSet = Sets.newHashSet(); + private final Set<QueryModelNode> spSet = Sets.newHashSet(); + + public Set<QueryModelNode> getFilters() { + return filterSet; + } + + public Set<QueryModelNode> getSPs() { + return spSet; + } + + public boolean isValid() { + return isValid; + } + + @Override + public void meet(Projection node) { + node.getArg().visit(this); + } + + @Override + public void meet(Filter node) { + filterSet.add(node.getCondition()); + node.getArg().visit(this); + } + + @Override + public void meet(StatementPattern node) { + spSet.add(node); + } + + @Override + public void meetNode(QueryModelNode node) { + + if (!(node instanceof Join || node instanceof StatementPattern + || node instanceof BindingSetAssignment + || node instanceof Var || node instanceof Union || node instanceof LeftJoin)) { + isValid = false; + return; + + } else { + super.meetNode(node); + } + } + + } + + private static List<ExternalTupleSet> getAccIndices(Configuration conf) + throws MalformedQueryException, SailException, + QueryEvaluationException, TableNotFoundException, + AccumuloException, AccumuloSecurityException, PcjException { + + List<String> tables = null; + + if (conf instanceof RdfCloudTripleStoreConfiguration) { + tables = ((RdfCloudTripleStoreConfiguration) conf).getPcjTables(); + } + + final String tablePrefix = conf + .get(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX); + final Connector c = ConfigUtils.getConnector(conf); + final Map<String, String> indexTables = Maps.newLinkedHashMap(); + PcjTables pcj = new PcjTables(); + + if (tables != null && !tables.isEmpty()) { + for (final String table : tables) { + indexTables + .put(table, pcj.getPcjMetadata(c, table).getSparql()); + } + } else { + for (final String table : c.tableOperations().list()) { + if (table.startsWith(tablePrefix + "INDEX")) { + indexTables.put(table, pcj.getPcjMetadata(c, table) + .getSparql()); + } + } + + } + final List<ExternalTupleSet> index = Lists.newArrayList(); + + if (indexTables.isEmpty()) { + System.out.println("No Index found"); + } else { + for (final String table : indexTables.keySet()) { + final String indexSparqlString = indexTables.get(table); + index.add(new AccumuloIndexSet(indexSparqlString, c, table)); + } + } + return index; + } }
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c12f58f4/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/AccumuloIndexSet.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/AccumuloIndexSet.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/AccumuloIndexSet.java index dda452d..456c465 100644 --- a/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/AccumuloIndexSet.java +++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/AccumuloIndexSet.java @@ -8,9 +8,9 @@ package mvm.rya.indexing.external.tupleSet; * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -19,608 +19,373 @@ package mvm.rya.indexing.external.tupleSet; * under the License. */ - - import info.aduna.iteration.CloseableIteration; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.NoSuchElementException; import java.util.Set; -import mvm.rya.accumulo.precompQuery.AccumuloPrecompQueryIndexer; +import mvm.rya.accumulo.precompQuery.AccumuloPcjQuery; +import mvm.rya.api.utils.IteratorWrapper; +import mvm.rya.indexing.PcjQuery; +import mvm.rya.indexing.external.tupleSet.PcjTables.PcjException; +import mvm.rya.indexing.external.tupleSet.PcjTables.PcjMetadata; +import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder; import mvm.rya.rdftriplestore.evaluation.ExternalBatchingIterator; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.MutationsRejectedException; -import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.Text; -import org.openrdf.model.Literal; -import org.openrdf.model.URI; -import org.openrdf.model.impl.LiteralImpl; -import org.openrdf.model.impl.URIImpl; import org.openrdf.query.Binding; import org.openrdf.query.BindingSet; import org.openrdf.query.MalformedQueryException; import org.openrdf.query.QueryEvaluationException; import org.openrdf.query.algebra.Projection; -import org.openrdf.query.algebra.QueryModelNode; -import org.openrdf.query.algebra.StatementPattern; -import org.openrdf.query.algebra.ValueExpr; +import org.openrdf.query.algebra.TupleExpr; import org.openrdf.query.algebra.Var; import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; -import org.openrdf.query.impl.EmptyBindingSet; import org.openrdf.query.parser.ParsedTupleQuery; import org.openrdf.query.parser.sparql.SPARQLParser; -import org.openrdf.repository.sail.SailRepositoryConnection; import org.openrdf.sail.SailException; -import com.beust.jcommander.internal.Sets; import com.google.common.base.Joiner; -import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; - +import com.google.common.collect.Sets; + +/** + * During query planning, this node is inserted into the parsed query to + * represent part of the original query (a sub-query). This sub-query is the + * value returned by {@link ExternalTupleSet#getTupleExpr()}. The results + * associated with this sub-query are stored in an external Accumulo table, + * where accCon and tablename are the associated {@link Connector} and table + * name. During evaluation, the portion of the query in + * {@link AccumuloIndexSet} is evaluated by scanning the external Accumulo + * table. This class is extremely useful for caching queries and reusing results + * from previous SPARQL queries. + * <p> + * + * The the {@link TupleExpr} returned by {@link ExternalTupleSet#getTupleExpr()} + * may have different variables than the query and variables stored in the + * external Accumulo table. The mapping of variables from the TupleExpr to the + * table variables are given by {@link ExternalTupleSet#getTableVarMap()}. In + * addition to allowing the variables to differ, it is possible for TupleExpr to + * have fewer variables than the table query--that is, some of the variables in + * the table query may appear as constants in the TupleExpr. Theses expression + * are extracted from TupleExpr by the methods + * {@link AccumuloIndexSet#getConstantConstraints()} and by the Visitor + * {@link ValueMapVisitor} to be used as constraints when scanning the Accumulo + * table. This allows for the pre-computed results to be used for a larger class + * of sub-queries. + * + */ public class AccumuloIndexSet extends ExternalTupleSet implements ExternalBatchingIterator { - private static final int WRITER_MAX_WRITE_THREADS = 30; - private static final long WRITER_MAX_LATNECY = Long.MAX_VALUE; - private static final long WRITER_MAX_MEMORY = 500L * 1024L * 1024L; - private Map<String,AccValueFactory> bindings; - private List<String> bindingslist; - private final Connector accCon; - private final String tablename; - private long tableSize = 0; - private List<String> varOrder = null; - - - public static interface AccValueFactory { - public org.openrdf.model.Value create(String str); - - public String create(org.openrdf.model.Value val); - } - - public static class AccUrlFactory implements AccValueFactory { - @Override - public org.openrdf.model.Value create(final String str) { - return new URIImpl(str); - } + private final Connector accCon; //connector to Accumulo table where results are stored + private final String tablename; //name of Accumulo table + private List<String> varOrder = null; // orders in which results are written to table + private PcjTables pcj = new PcjTables(); - @Override - public String create(org.openrdf.model.Value val) { - return val.stringValue(); - } + @Override + public Map<String, Set<String>> getSupportedVariableOrders() { + return this.getSupportedVariableOrderMap(); } - - public static class AccValueFactoryImpl implements AccValueFactory { - @Override - public org.openrdf.model.Value create(final String str) { - String[] split = str.split("\u0001"); - if (split.length > 1 && split[1].equals("1")) { - return new URIImpl(split[0]); - } - if (split[0].contains(":")) { - return new URIImpl(split[0]); - } - return new LiteralImpl(split[0]); - } - @Override - public String create(org.openrdf.model.Value val) { - if (val instanceof URI) { - return val.stringValue() + "\u0001" + 1; - } - if (val instanceof Literal) { - Literal v = (Literal) val; - return v.getLabel() + "\u0001" + 2; - } - return null; - } + @Override + public String getSignature() { + return "AccumuloIndexSet(" + tablename + ") : " + Joiner.on(", ").join(this.getTupleExpr().getAssuredBindingNames()); } - - //TODO set supportedVarOrderMap - public AccumuloIndexSet(String sparql, SailRepositoryConnection conn, Connector accCon, String tablename) throws MalformedQueryException, SailException, - QueryEvaluationException, MutationsRejectedException, TableNotFoundException { - super(null); + /** + * + * @param sparql - name of sparql query whose results will be stored in PCJ table + * @param accCon - connection to a valid Accumulo instance + * @param tablename - name of an existing PCJ table + * @throws MalformedQueryException + * @throws SailException + * @throws QueryEvaluationException + * @throws MutationsRejectedException + * @throws TableNotFoundException + */ + public AccumuloIndexSet(String sparql, Connector accCon, String tablename) throws MalformedQueryException, SailException, QueryEvaluationException, + MutationsRejectedException, TableNotFoundException { this.tablename = tablename; this.accCon = accCon; - SPARQLParser sp = new SPARQLParser(); - ParsedTupleQuery pq = (ParsedTupleQuery) sp.parseQuery(sparql, null); - + final SPARQLParser sp = new SPARQLParser(); + final ParsedTupleQuery pq = (ParsedTupleQuery) sp.parseQuery(sparql, null); setProjectionExpr((Projection) pq.getTupleExpr()); - CloseableIteration<BindingSet,QueryEvaluationException> iter = (CloseableIteration<BindingSet,QueryEvaluationException>) conn.getSailConnection() - .evaluate(getTupleExpr(), null, new EmptyBindingSet(), false); - - BatchWriter w = accCon.createBatchWriter(tablename, WRITER_MAX_MEMORY, WRITER_MAX_LATNECY, WRITER_MAX_WRITE_THREADS); - this.bindingslist = Lists.newArrayList(pq.getTupleExpr().getAssuredBindingNames()); - - this.bindings = Maps.newHashMap(); - - pq.getTupleExpr().visit(new QueryModelVisitorBase<RuntimeException>() { - @Override - public void meet(Var node) { - QueryModelNode parent = node.getParentNode(); - if (parent instanceof StatementPattern) { - StatementPattern statement = (StatementPattern) parent; - if (node.equals(statement.getSubjectVar())) { - bindings.put(node.getName(), new AccUrlFactory()); - } - if (node.equals(statement.getPredicateVar())) { - bindings.put(node.getName(), new AccUrlFactory()); - } - if (node.equals(statement.getObjectVar())) { - bindings.put(node.getName(), new AccValueFactoryImpl()); - } - if (node.equals(statement.getContextVar())) { - // TODO is this correct? - bindings.put(node.getName(), new AccUrlFactory()); - } - } else if(parent instanceof ValueExpr) { - bindings.put(node.getName(), new AccValueFactoryImpl()); - } - }; - }); - - - - - - varOrder = new ArrayList<String>(bindingslist.size()); - - while (iter.hasNext()) { - - BindingSet bs = iter.next(); - List<String> shiftBindingList = null; - for (int j = 0; j < bindingslist.size(); j++) { - StringBuffer sb = new StringBuffer(); - shiftBindingList = listShift(bindingslist, j); //TODO calling this each time not efficient - String order = ""; - for (String b : shiftBindingList) { - String val = bindings.get(b).create(bs.getValue(b)); - sb.append(val).append("\u0000"); - if (order.length() == 0) { - order = b; - } else { - order = order + "\u0000" + b; - } - } - - if (varOrder.size() < bindingslist.size()) { - varOrder.add(order); - } - - //System.out.println("String buffer is " + sb); - Mutation m = new Mutation(sb.deleteCharAt(sb.length() - 1).toString()); - m.put(new Text(varOrder.get(j)), new Text(""), new org.apache.accumulo.core.data.Value(new byte[]{})); - w.addMutation(m); - } - tableSize += 1; + Set<VariableOrder> orders = null; + try { + orders = pcj.getPcjMetadata(accCon, tablename).getVarOrders(); + } catch (final PcjException e) { + e.printStackTrace(); + } + + varOrder = Lists.newArrayList(); + for(final VariableOrder var: orders) { + varOrder.add(var.toString()); } - setLocalityGroups(tablename, accCon, varOrder); - this.setSupportedVariableOrderMap(createSupportedVarOrderMap(varOrder)); - - - String orders = ""; - - for(String s : varOrder) { - s = s.replace("\u0000", ";"); - if(orders.length() == 0) { - orders = s; - } else { - orders = orders + "\u0000" + s; - } - } - - - Mutation m = new Mutation("~SPARQL"); - Value v = new Value(sparql.getBytes()); - m.put(new Text("" + tableSize), new Text(orders), v); - w.addMutation(m); - - w.close(); - iter.close(); + this.setSupportedVariableOrderMap(varOrder); } - - - + /** + * + * @param accCon - connection to a valid Accumulo instance + * @param tablename - name of an existing PCJ table + * @throws MalformedQueryException + * @throws SailException + * @throws QueryEvaluationException + * @throws MutationsRejectedException + * @throws TableNotFoundException + */ + public AccumuloIndexSet(Connector accCon, String tablename) + throws MalformedQueryException, SailException, + QueryEvaluationException, MutationsRejectedException, + TableNotFoundException { + PcjMetadata meta = null; + try { + meta = pcj.getPcjMetadata(accCon, tablename); + } catch (final PcjException e) { + e.printStackTrace(); + } + + this.tablename = tablename; + this.accCon = accCon; + final SPARQLParser sp = new SPARQLParser(); + final ParsedTupleQuery pq = (ParsedTupleQuery) sp.parseQuery(meta.getSparql(), + null); + setProjectionExpr((Projection) pq.getTupleExpr()); + final Set<VariableOrder> orders = meta.getVarOrders(); + + varOrder = Lists.newArrayList(); + for (final VariableOrder var : orders) { + varOrder.add(var.toString()); + } + setLocalityGroups(tablename, accCon, varOrder); + this.setSupportedVariableOrderMap(varOrder); + } + + /** + * returns size of table for query planning + */ @Override - public Map<String, Set<String>> getSupportedVariableOrders() { - - return this.getSupportedVariableOrderMap(); - + public double cardinality() { + double cardinality = 0; + try { + cardinality = pcj.getPcjMetadata(accCon, tablename).getCardinality(); + } catch (PcjException e) { + e.printStackTrace(); + } + return cardinality; } - - - @Override - public boolean supportsBindingSet(Set<String> bindingNames) { - - Map<String, Set<String>> varOrderMap = this.getSupportedVariableOrders(); - Collection<Set<String>> values = varOrderMap.values(); - Set<String> bNames = Sets.newHashSet(); - for (String s : this.getTupleExpr().getAssuredBindingNames()) { - if (bindingNames.contains(s)) { - bNames.add(s); - } - } - - return values.contains(bNames); - } - - - private String getVarOrder(Set<String> variables) { - Map<String, Set<String>> varOrderMap = this.getSupportedVariableOrders(); + /** + * + * @param tableName + * @param conn + * @param groups - locality groups to be created + * + * Sets locality groups for more efficient scans - these are usually the variable + * orders in the table so that scans for specific orders are more efficient + */ + private void setLocalityGroups(String tableName, Connector conn, List<String> groups) { - Set<Map.Entry<String, Set<String>>> entries = varOrderMap.entrySet(); + final HashMap<String, Set<Text>> localityGroups = new HashMap<String, Set<Text>>(); + for (int i = 0; i < groups.size(); i++) { + final HashSet<Text> tempColumn = new HashSet<Text>(); + tempColumn.add(new Text(groups.get(i))); + final String groupName = groups.get(i).replace(VAR_ORDER_DELIM, ""); + localityGroups.put(groupName, tempColumn); + } - for (Map.Entry<String, Set<String>> e : entries) { + try { + conn.tableOperations().setLocalityGroups(tableName, localityGroups); + } catch (AccumuloException | AccumuloSecurityException + | TableNotFoundException e) { + e.printStackTrace(); + } - if (e.getValue().equals(variables)) { - return e.getKey(); - } + } - } - return null; + @Override + public CloseableIteration<BindingSet,QueryEvaluationException> evaluate(BindingSet bindingset) throws QueryEvaluationException { + return this.evaluate(Collections.singleton(bindingset)); } - private String prefixToOrder(String order) { - - Map<String, String> invMap = HashBiMap.create(this.getTableVarMap()).inverse(); - String[] temp = order.split("\u0000"); - - for (int i = 0; i < temp.length; i++) { - temp[i] = this.getTableVarMap().get(temp[i]); + /** + * Core evaluation method used during query evaluation - given a collection of binding set constraints, this + * method finds common binding labels between the constraints and table, uses those to build a prefix scan + * of the Accumulo table, and creates a solution binding set by iterating of the scan results. + */ + @Override + public CloseableIteration<BindingSet,QueryEvaluationException> evaluate(final Collection<BindingSet> bindingset) throws QueryEvaluationException { + String localityGroup = ""; + final Set<String> commonVars = Sets.newHashSet(); + // if bindingset is empty, there are no results, so return empty iterator + if (bindingset.isEmpty()) { + return new IteratorWrapper<BindingSet, QueryEvaluationException>(new HashSet<BindingSet>().iterator()); } - - order = Joiner.on("\u0000").join(temp); - - for (String s : varOrder) { - if (s.startsWith(order)) { - - temp = s.split("\u0000"); - - for (int i = 0; i < temp.length; i++) { - temp[i] = invMap.get(temp[i]); + //to build range prefix, find common vars of bindingset and PCJ bindings + else { + final BindingSet bs = bindingset.iterator().next(); + for (final String b : this.getTupleExpr().getAssuredBindingNames()) { + final Binding v = bs.getBinding(b); + if (v != null) { + commonVars.add(b); } - return Joiner.on("\u0000").join(temp); } } - throw new NoSuchElementException("Order is not a prefix of any locality group value!"); + //add any constant constraints to common vars to be used in range prefix + commonVars.addAll(getConstantConstraints()); + PcjQuery apq = null; + List<String> fullVarOrder = null; + String commonVarOrder = null; + try { + if (commonVars.size() > 0) { + commonVarOrder = getVarOrder(commonVars); + if(commonVarOrder == null) { + throw new IllegalStateException("Index does not support binding set!"); + } + fullVarOrder = Lists.newArrayList(prefixToOrder(commonVarOrder).split(VAR_ORDER_DELIM)); + //use varOrder and tableVarMap to set correct scan column + localityGroup = orderToLocGroup(fullVarOrder); + } else { + localityGroup = varOrder.get(0); + } + apq = new AccumuloPcjQuery(accCon, tablename); + final ValueMapVisitor vmv = new ValueMapVisitor(); + this.getTupleExpr().visit(vmv); + + List<String> commonVarOrderList = null; + if(commonVarOrder != null) { + commonVarOrderList = Lists.newArrayList(commonVarOrder.split(VAR_ORDER_DELIM)); + } else { + commonVarOrderList = new ArrayList<>(); + } + + return apq.queryPrecompJoin(commonVarOrderList, localityGroup, vmv.getValMap(), + HashBiMap.create(this.getTableVarMap()).inverse(), bindingset); + } catch(final TableNotFoundException e) { + throw new QueryEvaluationException(e); + } } + /** + * + * @param order - variable order as indicated by query + * @return - locality group or column family used in scan - this + * is just the variable order expressed in terms of the variables stored + * in the table + */ private String orderToLocGroup(List<String> order) { String localityGroup = ""; - for (String s : order) { + for (final String s : order) { if (localityGroup.length() == 0) { localityGroup = this.getTableVarMap().get(s); } else { - localityGroup = localityGroup + "\u0000" + this.getTableVarMap().get(s); + localityGroup = localityGroup + VAR_ORDER_DELIM + this.getTableVarMap().get(s); } } return localityGroup; - } - - - private void setLocalityGroups(String tableName, Connector conn, List<String> groups) { - - HashMap<String, Set<Text>> localityGroups = new HashMap<String, Set<Text>>(); - - - - for (int i = 0; i < groups.size(); i++) { - HashSet<Text> tempColumn = new HashSet<Text>(); - tempColumn.add(new Text(groups.get(i))); - String groupName = groups.get(i).replace("\u0000",""); - localityGroups.put(groupName, tempColumn); - } - - try { - conn.tableOperations().setLocalityGroups(tableName, localityGroups); - } catch (AccumuloException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } catch (AccumuloSecurityException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } catch (TableNotFoundException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - - - - } - - - - - - - - private List<String> listShift(List<String> list, int j) { - - if(j >= list.size()) { - throw new IllegalArgumentException(); - } - - List<String> shiftList = Lists.newArrayList(); - for(int i=0; i<list.size(); i++) { - shiftList.add(list.get((i+j)%list.size())); + /** + * + * @param order - prefix of a full variable order + * @return - full variable order that includes all variables whose values + * are stored in the table - used to obtain the locality group + */ + //given partial order of query vars, convert to PCJ vars and determine + //if converted partial order is a substring of a full var order of PCJ variables. + //if converted partial order is a prefix, convert corresponding full PCJ var order to query vars + private String prefixToOrder(String order) { + final Map<String, String> invMap = HashBiMap.create(this.getTableVarMap()).inverse(); + String[] temp = order.split(VAR_ORDER_DELIM); + //get order in terms of PCJ variables + for (int i = 0; i < temp.length; i++) { + temp[i] = this.getTableVarMap().get(temp[i]); } - - return shiftList; - } - - - - private Set<String> getConstantConstraints() { - - Map<String, String> tableMap = this.getTableVarMap(); - Set<String> keys = tableMap.keySet(); - Set<String> constants = Sets.newHashSet(); - - for (String s : keys) { - if (s.startsWith("-const-")) { - constants.add(s); + order = Joiner.on(VAR_ORDER_DELIM).join(temp); + for (final String s : varOrder) { + //verify that partial order is prefix of a PCJ varOrder + if (s.startsWith(order)) { + temp = s.split(VAR_ORDER_DELIM); + //convert full PCJ varOrder back to query varOrder + for (int i = 0; i < temp.length; i++) { + temp[i] = invMap.get(temp[i]); + } + return Joiner.on(VAR_ORDER_DELIM).join(temp); } - } - - return constants; - - } - - - - - public AccumuloIndexSet(String sparql, Connector accCon, String tablename) throws MalformedQueryException, SailException, QueryEvaluationException, - MutationsRejectedException, TableNotFoundException { - super(null); - this.tablename = tablename; - this.accCon = accCon; - SPARQLParser sp = new SPARQLParser(); - ParsedTupleQuery pq = (ParsedTupleQuery) sp.parseQuery(sparql, null); - - setProjectionExpr((Projection) pq.getTupleExpr()); - - this.bindingslist = Lists.newArrayList(pq.getTupleExpr().getAssuredBindingNames()); - - this.bindings = Maps.newHashMap(); - pq.getTupleExpr().visit(new QueryModelVisitorBase<RuntimeException>() { - @Override - public void meet(Var node) { - QueryModelNode parent = node.getParentNode(); - if (parent instanceof StatementPattern) { - StatementPattern statement = (StatementPattern) parent; - if (node.equals(statement.getSubjectVar())) { - bindings.put(node.getName(), new AccUrlFactory()); - } - if (node.equals(statement.getPredicateVar())) { - bindings.put(node.getName(), new AccUrlFactory()); - } - if (node.equals(statement.getObjectVar())) { - bindings.put(node.getName(), new AccValueFactoryImpl()); - } - if (node.equals(statement.getContextVar())) { - // TODO is this correct? - bindings.put(node.getName(), new AccUrlFactory()); - } - } else if(parent instanceof ValueExpr) { - bindings.put(node.getName(), new AccValueFactoryImpl()); - } - }; - }); - - - - - Scanner s = accCon.createScanner(tablename, new Authorizations()); - s.setRange(Range.exact(new Text("~SPARQL"))); - Iterator<Entry<Key,Value>> i = s.iterator(); - - String[] tempVarOrders = null; - - if (i.hasNext()) { - Entry<Key, Value> entry = i.next(); - Text ts = entry.getKey().getColumnFamily(); - tempVarOrders = entry.getKey().getColumnQualifier().toString().split("\u0000"); - tableSize = Long.parseLong(ts.toString()); - - } else { - throw new IllegalStateException("Index table contains no metadata!"); - } - - - varOrder = Lists.newArrayList(); - - for(String t: tempVarOrders) { - t = t.replace(";","\u0000"); - varOrder.add(t); - } - - setLocalityGroups(tablename, accCon, varOrder); - this.setSupportedVariableOrderMap(createSupportedVarOrderMap(varOrder)); - + throw new NoSuchElementException("Order is not a prefix of any locality group value!"); } - - - - private Map<String, Set<String>> createSupportedVarOrderMap(List<String> orders) { - - Map<String, Set<String>> supportedVars = Maps.newHashMap(); - - for (String t : orders) { - - String[] tempOrder = t.split("\u0000"); - Set<String> varSet = Sets.newHashSet(); - String u = ""; - - for (String s : tempOrder) { - if(u.length() == 0) { - u = s; - } else{ - u = u+ "\u0000" + s; - } - varSet.add(s); - supportedVars.put(u, new HashSet<String>(varSet)); - + /** + * + * @param variables + * @return - string representation of the Set variables, in an order that is in the + * table + */ + private String getVarOrder(Set<String> variables) { + final Map<String, Set<String>> varOrderMap = this.getSupportedVariableOrders(); + final Set<Map.Entry<String, Set<String>>> entries = varOrderMap.entrySet(); + for (final Map.Entry<String, Set<String>> e : entries) { + if (e.getValue().equals(variables)) { + return e.getKey(); } - } - - return supportedVars; - } - - - - @Override - public void setProjectionExpr(Projection tupleExpr) { - super.setProjectionExpr(tupleExpr); - this.bindingslist = Lists.newArrayList(tupleExpr.getAssuredBindingNames()); - - this.bindings = Maps.newHashMap(); - tupleExpr.visit(new QueryModelVisitorBase<RuntimeException>() { - @Override - public void meet(Var node) { - QueryModelNode parent = node.getParentNode(); - if (parent instanceof StatementPattern) { - StatementPattern statement = (StatementPattern) parent; - if (node.equals(statement.getSubjectVar())) { - bindings.put(node.getName(), new AccUrlFactory()); - } - if (node.equals(statement.getPredicateVar())) { - bindings.put(node.getName(), new AccUrlFactory()); - } - if (node.equals(statement.getObjectVar())) { - bindings.put(node.getName(), new AccValueFactoryImpl()); - } - if (node.equals(statement.getContextVar())) { - // TODO is this correct? - bindings.put(node.getName(), new AccUrlFactory()); - } - } else if (parent instanceof ValueExpr) { //Add bindings associated with Filters - bindings.put(node.getName(), new AccValueFactoryImpl()); - } - }; - }); - - } - - @Override - public String getSignature() { - return "AccumuloIndexSet(" + tablename + ") : " + Joiner.on(", ").join(bindingslist); - } - - @Override - public CloseableIteration<BindingSet,QueryEvaluationException> evaluate(BindingSet bindingset) throws QueryEvaluationException { - return this.evaluate(Collections.singleton(bindingset)); - } - - @Override - public double cardinality() { - return tableSize; + return null; } - @Override - public CloseableIteration<BindingSet,QueryEvaluationException> evaluate(final Collection<BindingSet> bindingset) throws QueryEvaluationException { - - String localityGroup = ""; - Set<String> commonVars = Sets.newHashSet(); - - if (!bindingset.isEmpty()) { - - BindingSet bs = bindingset.iterator().next(); - for (String b : bindingslist) { - Binding v = bs.getBinding(b); - if (v != null) { - commonVars.add(b); - } - - } - } - - commonVars.addAll(getConstantConstraints()); - AccumuloPrecompQueryIndexer apq = null; - List<String> fullVarOrder = null; - try { - - if (commonVars.size() > 0) { - String commonVarOrder = getVarOrder(commonVars); - if(commonVarOrder == null) { - throw new IllegalStateException("Index does not support binding set!"); - } - fullVarOrder = Lists.newArrayList(prefixToOrder(commonVarOrder).split("\u0000")); - localityGroup = orderToLocGroup(fullVarOrder); - fullVarOrder.add("" + commonVars.size()); - - } else { - fullVarOrder = bindingslist; - localityGroup = orderToLocGroup(fullVarOrder); - fullVarOrder.add("" + 0); + /** + * @return - all constraints which correspond to variables + * in {@link AccumuloIndexSet#getTupleExpr()} which are set + * equal to a constant, but are non-constant in Accumulo table + */ + private Set<String> getConstantConstraints() { + final Map<String, String> tableMap = this.getTableVarMap(); + final Set<String> keys = tableMap.keySet(); + final Set<String> constants = Sets.newHashSet(); + for (final String s : keys) { + if (s.startsWith("-const-")) { + constants.add(s); } - - - apq = new AccumuloPrecompQueryIndexer(accCon, tablename); - ValueMapVisitor vmv = new ValueMapVisitor(); - this.getTupleExpr().visit(vmv); - - return apq.queryPrecompJoin(fullVarOrder, localityGroup, this.bindings, vmv.getValMap(), bindingset); - - } catch(TableNotFoundException e) { - throw new QueryEvaluationException(e); - } finally { - IOUtils.closeQuietly(apq); } + return constants; } - - - public class ValueMapVisitor extends QueryModelVisitorBase<RuntimeException> { + /** + * + * Extracts the values associated with constant labels in the query + * Used to create binding sets from range scan + */ + public class ValueMapVisitor extends QueryModelVisitorBase<RuntimeException> { Map<String, org.openrdf.model.Value> valMap = Maps.newHashMap(); - - public Map<String, org.openrdf.model.Value> getValMap() { return valMap; } - @Override public void meet(Var node) { if (node.getName().startsWith("-const-")) { valMap.put(node.getName(), node.getValue()); } - } - } - - + } - + +
