RYA-99 Use PrecomputedJoinStorage to get Pcj tables instead of TableOperations
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/e4ef9a96 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/e4ef9a96 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/e4ef9a96 Branch: refs/heads/master Commit: e4ef9a96b8057ba773e360513fba2210d1c68976 Parents: ab56da1 Author: Caleb Meier <meierca...@gmail.com> Authored: Thu Jul 21 16:05:28 2016 -0400 Committer: Aaron Mihalik <miha...@alum.mit.edu> Committed: Tue Aug 23 10:41:37 2016 -0400 ---------------------------------------------------------------------- .../rya/indexing/pcj/matching/PCJOptimizer.java | 546 ++++++++++--------- .../storage/accumulo/PcjTableNameFactory.java | 12 + 2 files changed, 289 insertions(+), 269 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e4ef9a96/extras/indexing/src/main/java/mvm/rya/indexing/pcj/matching/PCJOptimizer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/pcj/matching/PCJOptimizer.java b/extras/indexing/src/main/java/mvm/rya/indexing/pcj/matching/PCJOptimizer.java index a481c96..a163e0b 100644 --- a/extras/indexing/src/main/java/mvm/rya/indexing/pcj/matching/PCJOptimizer.java +++ b/extras/indexing/src/main/java/mvm/rya/indexing/pcj/matching/PCJOptimizer.java @@ -19,6 +19,8 @@ package mvm.rya.indexing.pcj.matching; * under the License. */ +import static java.util.Objects.requireNonNull; + import java.util.Iterator; import java.util.List; import java.util.Map; @@ -31,6 +33,7 @@ import mvm.rya.indexing.IndexPlanValidator.ThreshholdPlanSelector; import mvm.rya.indexing.IndexPlanValidator.TupleReArranger; import mvm.rya.indexing.IndexPlanValidator.ValidIndexCombinationGenerator; import mvm.rya.indexing.accumulo.ConfigUtils; +import mvm.rya.indexing.external.accumulo.AccumuloPcjStorage; import mvm.rya.indexing.external.tupleSet.AccumuloIndexSet; import mvm.rya.indexing.external.tupleSet.ExternalTupleSet; @@ -42,7 +45,8 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.log4j.Logger; import org.apache.rya.indexing.pcj.storage.PcjException; -import org.apache.rya.indexing.pcj.storage.accumulo.PcjTables; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; +import org.apache.rya.indexing.pcj.storage.accumulo.PcjTableNameFactory; import org.openrdf.query.BindingSet; import org.openrdf.query.Dataset; import org.openrdf.query.MalformedQueryException; @@ -76,273 +80,277 @@ import com.google.common.collect.Maps; */ public class PCJOptimizer implements QueryOptimizer, Configurable { - private static final Logger log = Logger.getLogger(PCJOptimizer.class); - private List<ExternalTupleSet> indexSet; - private Configuration conf; - private boolean init = false; - - public PCJOptimizer() { - } - - public PCJOptimizer(Configuration conf) { - this.conf = conf; - try { - indexSet = PCJOptimizerUtilities.getValidPCJs(getAccIndices(conf)); // TODO - // validate - // PCJs - // during - // table - // creation - } catch (MalformedQueryException | SailException - | QueryEvaluationException | TableNotFoundException - | AccumuloException | AccumuloSecurityException | PcjException e) { - e.printStackTrace(); - } - init = true; - } - - public PCJOptimizer(List<ExternalTupleSet> indices, boolean useOptimalPcj) { - this.indexSet = PCJOptimizerUtilities.getValidPCJs(indices); - conf = new Configuration(); - conf.setBoolean(ConfigUtils.USE_OPTIMAL_PCJ, useOptimalPcj); - } - - @Override - public void setConf(Configuration conf) { - this.conf = conf; - if (!init) { - try { - indexSet = PCJOptimizerUtilities - .getValidPCJs(getAccIndices(conf)); - } catch (MalformedQueryException | SailException - | QueryEvaluationException | TableNotFoundException - | AccumuloException | AccumuloSecurityException - | PcjException e) { - e.printStackTrace(); - } - init = true; - } - } - - @Override - public Configuration getConf() { - return conf; - } - - /** - * This method optimizes a specified query by matching subsets of it with - * PCJ queries. - * - * @param tupleExpr - * - the query to be optimized - */ - @Override - public void optimize(TupleExpr tupleExpr, Dataset dataset, - BindingSet bindings) { - - Projection projection = PCJOptimizerUtilities.getProjection(tupleExpr); - if (projection == null) { - log.debug("TupleExpr has no Projection. Invalid TupleExpr."); - return; - } - IndexedExecutionPlanGenerator iep = new IndexedExecutionPlanGenerator( - tupleExpr, indexSet); - List<ExternalTupleSet> pcjs = iep.getNormalizedIndices(); - // first standardize query by pulling all filters to top of query if - // they exist - // using TopOfQueryFilterRelocator - tupleExpr = TopOfQueryFilterRelocator.moveFiltersToTop(tupleExpr); - - if (ConfigUtils.getUseOptimalPCJ(conf) && pcjs.size() > 0) { - - // get potential relevant index combinations - ValidIndexCombinationGenerator vic = new ValidIndexCombinationGenerator( - tupleExpr); - Iterator<List<ExternalTupleSet>> iter = vic - .getValidIndexCombos(pcjs); - TupleExpr bestTup = null; - TupleExpr tempTup = null; - double tempCost = 0; - double minCost = Double.MAX_VALUE; - - while (iter.hasNext()) { - // apply join visitor to place external index nodes in query - TupleExpr clone = tupleExpr.clone(); - QuerySegmentPCJMatchVisitor.matchPCJs(clone, iter.next()); - - // get all valid execution plans for given external index - // combination by considering all - // permutations of nodes in TupleExpr - IndexPlanValidator ipv = new IndexPlanValidator(false); - Iterator<TupleExpr> validTups = ipv - .getValidTuples(TupleReArranger.getTupleReOrderings( - clone).iterator()); - - // set valid plan according to a specified cost threshold, where - // cost depends on specified weights - // for number of external index nodes, common variables among - // joins in execution plan, and number of - // external products in execution plan - ThreshholdPlanSelector tps = new ThreshholdPlanSelector( - tupleExpr); - tempTup = tps.getThreshholdQueryPlan(validTups, .4, .5, .2, .3); - - // choose best threshhold TupleExpr among all index node - // combinations - tempCost = tps.getCost(tempTup, .5, .2, .3); - if (tempCost < minCost) { - minCost = tempCost; - bestTup = tempTup; - } - } - if (bestTup != null) { - Projection bestTupProject = PCJOptimizerUtilities - .getProjection(bestTup); - projection.setArg(bestTupProject.getArg()); - } - return; - } else if (pcjs.size() > 0) { - QuerySegmentPCJMatchVisitor.matchPCJs(tupleExpr, pcjs); - } else { - return; - } - } - - /** - * This visitor navigates query until it reaches either a Join, Filter, or - * LeftJoin. Once it reaches this node, it gets the appropriate PCJMatcher - * from the {@link QuerySegmentPCJMatchVisitor} and uses this to match each - * of the PCJs to the {@link QuerySegment} starting with the Join, Filter, - * or LeftJoin. Once each PCJ has been compared for matching, the portion of - * the query starting with the Join, Filter, or LeftJoin is replaced by the - * {@link TupleExpr} returned by {@link PCJMatcher#getQuery()}. This visitor - * then visits each of the nodes returned by {@link PCJMatcher#getUnmatchedArgs()}. - * - */ - static class QuerySegmentPCJMatchVisitor extends - QueryModelVisitorBase<RuntimeException> { - - private static List<ExternalTupleSet> pcjs; - private static final QuerySegmentPCJMatchVisitor INSTANCE = new QuerySegmentPCJMatchVisitor(); - - private QuerySegmentPCJMatchVisitor() { - }; - - public static void matchPCJs(TupleExpr te, - List<ExternalTupleSet> indexSet) { - pcjs = indexSet; - te.visit(INSTANCE); - } - - @Override - public void meet(Join node) { - PCJMatcher matcher = PCJMatcherFactory.getPCJMatcher(node); - for (ExternalTupleSet pcj : pcjs) { - matcher.matchPCJ(pcj); - } - - node.replaceWith(matcher.getQuery()); - Set<TupleExpr> unmatched = matcher.getUnmatchedArgs(); - PCJOptimizerUtilities.relocateFilters(matcher.getFilters()); - - for (TupleExpr tupleExpr : unmatched) { - tupleExpr.visit(this); - } - } - - @Override - public void meet(LeftJoin node) { - PCJMatcher matcher = PCJMatcherFactory.getPCJMatcher(node); - for (ExternalTupleSet pcj : pcjs) { - matcher.matchPCJ(pcj); - } - - node.replaceWith(matcher.getQuery()); - Set<TupleExpr> unmatched = matcher.getUnmatchedArgs(); - PCJOptimizerUtilities.relocateFilters(matcher.getFilters()); - - for (TupleExpr tupleExpr : unmatched) { - tupleExpr.visit(this); - } - } - - @Override - public void meet(Filter node) { - PCJMatcher matcher = PCJMatcherFactory.getPCJMatcher(node); - for (ExternalTupleSet pcj : pcjs) { - matcher.matchPCJ(pcj); - } - - node.replaceWith(matcher.getQuery()); - Set<TupleExpr> unmatched = matcher.getUnmatchedArgs(); - PCJOptimizerUtilities.relocateFilters(matcher.getFilters()); - - for (TupleExpr tupleExpr : unmatched) { - tupleExpr.visit(this); - } - } - - } - - /** - * - * - * @param conf - * - client configuration - * - * @return - list of {@link ExternalTupleSet}s or PCJs that are either - * specified by user in Configuration or exist in system. - * - * @throws MalformedQueryException - * @throws SailException - * @throws QueryEvaluationException - * @throws TableNotFoundException - * @throws AccumuloException - * @throws AccumuloSecurityException - * @throws PcjException - */ - private static List<ExternalTupleSet> getAccIndices(Configuration conf) - throws MalformedQueryException, SailException, - QueryEvaluationException, TableNotFoundException, - AccumuloException, AccumuloSecurityException, PcjException { - - List<String> tables = null; - - if (conf instanceof RdfCloudTripleStoreConfiguration) { - tables = ((RdfCloudTripleStoreConfiguration) conf).getPcjTables(); - } - - String tablePrefix = conf - .get(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX); - Connector c = ConfigUtils.getConnector(conf); - Map<String, String> indexTables = Maps.newLinkedHashMap(); - PcjTables pcj = new PcjTables(); - - if (tables != null && !tables.isEmpty()) { - for (final String table : tables) { - indexTables - .put(table, pcj.getPcjMetadata(c, table).getSparql()); - } - } else { - for (final String table : c.tableOperations().list()) { - if (table.startsWith(tablePrefix + "INDEX")) { - indexTables.put(table, pcj.getPcjMetadata(c, table) - .getSparql()); - } - } - - } - final List<ExternalTupleSet> index = Lists.newArrayList(); - - if (indexTables.isEmpty()) { - System.out.println("No Index found"); - } else { - for (final String table : indexTables.keySet()) { - final String indexSparqlString = indexTables.get(table); - index.add(new AccumuloIndexSet(indexSparqlString, conf, table)); - } - } - return index; - } + private static final Logger log = Logger.getLogger(PCJOptimizer.class); + private List<ExternalTupleSet> indexSet; + private Configuration conf; + private boolean init = false; + + public PCJOptimizer() { + } + + public PCJOptimizer(Configuration conf) { + this.conf = conf; + try { + indexSet = PCJOptimizerUtilities.getValidPCJs(getAccIndices(conf)); // TODO + // validate + // PCJs + // during + // table + // creation + } catch (MalformedQueryException | SailException + | QueryEvaluationException | TableNotFoundException + | AccumuloException | AccumuloSecurityException | PcjException e) { + e.printStackTrace(); + } + init = true; + } + + public PCJOptimizer(List<ExternalTupleSet> indices, boolean useOptimalPcj) { + this.indexSet = PCJOptimizerUtilities.getValidPCJs(indices); + conf = new Configuration(); + conf.setBoolean(ConfigUtils.USE_OPTIMAL_PCJ, useOptimalPcj); + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + if (!init) { + try { + indexSet = PCJOptimizerUtilities + .getValidPCJs(getAccIndices(conf)); + } catch (MalformedQueryException | SailException + | QueryEvaluationException | TableNotFoundException + | AccumuloException | AccumuloSecurityException + | PcjException e) { + e.printStackTrace(); + } + init = true; + } + } + + @Override + public Configuration getConf() { + return conf; + } + + /** + * This method optimizes a specified query by matching subsets of it with + * PCJ queries. + * + * @param tupleExpr + * - the query to be optimized + */ + @Override + public void optimize(TupleExpr tupleExpr, Dataset dataset, + BindingSet bindings) { + + Projection projection = PCJOptimizerUtilities.getProjection(tupleExpr); + if (projection == null) { + log.debug("TupleExpr has no Projection. Invalid TupleExpr."); + return; + } + IndexedExecutionPlanGenerator iep = new IndexedExecutionPlanGenerator( + tupleExpr, indexSet); + List<ExternalTupleSet> pcjs = iep.getNormalizedIndices(); + // first standardize query by pulling all filters to top of query if + // they exist + // using TopOfQueryFilterRelocator + tupleExpr = TopOfQueryFilterRelocator.moveFiltersToTop(tupleExpr); + + if (ConfigUtils.getUseOptimalPCJ(conf) && pcjs.size() > 0) { + + // get potential relevant index combinations + ValidIndexCombinationGenerator vic = new ValidIndexCombinationGenerator( + tupleExpr); + Iterator<List<ExternalTupleSet>> iter = vic + .getValidIndexCombos(pcjs); + TupleExpr bestTup = null; + TupleExpr tempTup = null; + double tempCost = 0; + double minCost = Double.MAX_VALUE; + + while (iter.hasNext()) { + // apply join visitor to place external index nodes in query + TupleExpr clone = tupleExpr.clone(); + QuerySegmentPCJMatchVisitor.matchPCJs(clone, iter.next()); + + // get all valid execution plans for given external index + // combination by considering all + // permutations of nodes in TupleExpr + IndexPlanValidator ipv = new IndexPlanValidator(false); + Iterator<TupleExpr> validTups = ipv + .getValidTuples(TupleReArranger.getTupleReOrderings( + clone).iterator()); + + // set valid plan according to a specified cost threshold, where + // cost depends on specified weights + // for number of external index nodes, common variables among + // joins in execution plan, and number of + // external products in execution plan + ThreshholdPlanSelector tps = new ThreshholdPlanSelector( + tupleExpr); + tempTup = tps.getThreshholdQueryPlan(validTups, .4, .5, .2, .3); + + // choose best threshhold TupleExpr among all index node + // combinations + tempCost = tps.getCost(tempTup, .5, .2, .3); + if (tempCost < minCost) { + minCost = tempCost; + bestTup = tempTup; + } + } + if (bestTup != null) { + Projection bestTupProject = PCJOptimizerUtilities + .getProjection(bestTup); + projection.setArg(bestTupProject.getArg()); + } + return; + } else if (pcjs.size() > 0) { + QuerySegmentPCJMatchVisitor.matchPCJs(tupleExpr, pcjs); + } else { + return; + } + } + + /** + * This visitor navigates query until it reaches either a Join, Filter, or + * LeftJoin. Once it reaches this node, it gets the appropriate PCJMatcher + * from the {@link QuerySegmentPCJMatchVisitor} and uses this to match each + * of the PCJs to the {@link QuerySegment} starting with the Join, Filter, + * or LeftJoin. Once each PCJ has been compared for matching, the portion of + * the query starting with the Join, Filter, or LeftJoin is replaced by the + * {@link TupleExpr} returned by {@link PCJMatcher#getQuery()}. This visitor + * then visits each of the nodes returned by {@link PCJMatcher#getUnmatchedArgs()}. + * + */ + static class QuerySegmentPCJMatchVisitor extends + QueryModelVisitorBase<RuntimeException> { + + private static List<ExternalTupleSet> pcjs; + private static final QuerySegmentPCJMatchVisitor INSTANCE = new QuerySegmentPCJMatchVisitor(); + + private QuerySegmentPCJMatchVisitor() { + }; + + public static void matchPCJs(TupleExpr te, + List<ExternalTupleSet> indexSet) { + pcjs = indexSet; + te.visit(INSTANCE); + } + + @Override + public void meet(Join node) { + PCJMatcher matcher = PCJMatcherFactory.getPCJMatcher(node); + for (ExternalTupleSet pcj : pcjs) { + matcher.matchPCJ(pcj); + } + + node.replaceWith(matcher.getQuery()); + Set<TupleExpr> unmatched = matcher.getUnmatchedArgs(); + PCJOptimizerUtilities.relocateFilters(matcher.getFilters()); + + for (TupleExpr tupleExpr : unmatched) { + tupleExpr.visit(this); + } + } + + @Override + public void meet(LeftJoin node) { + PCJMatcher matcher = PCJMatcherFactory.getPCJMatcher(node); + for (ExternalTupleSet pcj : pcjs) { + matcher.matchPCJ(pcj); + } + + node.replaceWith(matcher.getQuery()); + Set<TupleExpr> unmatched = matcher.getUnmatchedArgs(); + PCJOptimizerUtilities.relocateFilters(matcher.getFilters()); + + for (TupleExpr tupleExpr : unmatched) { + tupleExpr.visit(this); + } + } + + @Override + public void meet(Filter node) { + PCJMatcher matcher = PCJMatcherFactory.getPCJMatcher(node); + for (ExternalTupleSet pcj : pcjs) { + matcher.matchPCJ(pcj); + } + + node.replaceWith(matcher.getQuery()); + Set<TupleExpr> unmatched = matcher.getUnmatchedArgs(); + PCJOptimizerUtilities.relocateFilters(matcher.getFilters()); + + for (TupleExpr tupleExpr : unmatched) { + tupleExpr.visit(this); + } + } + + } + + /** + * + * + * @param conf + * - client configuration + * + * @return - list of {@link ExternalTupleSet}s or PCJs that are either + * specified by user in Configuration or exist in system. + * + * @throws MalformedQueryException + * @throws SailException + * @throws QueryEvaluationException + * @throws TableNotFoundException + * @throws AccumuloException + * @throws AccumuloSecurityException + * @throws PcjException + */ + private static List<ExternalTupleSet> getAccIndices(Configuration conf) + throws MalformedQueryException, SailException, + QueryEvaluationException, TableNotFoundException, + AccumuloException, AccumuloSecurityException, PcjException { + + requireNonNull(conf); + String tablePrefix = requireNonNull(conf + .get(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX)); + Connector conn = requireNonNull(ConfigUtils.getConnector(conf)); + List<String> tables = null; + + if (conf instanceof RdfCloudTripleStoreConfiguration) { + tables = ((RdfCloudTripleStoreConfiguration) conf).getPcjTables(); + } + // this maps associates pcj table name with pcj sparql query + Map<String, String> indexTables = Maps.newLinkedHashMap(); + PrecomputedJoinStorage storage = new AccumuloPcjStorage(conn, tablePrefix); + PcjTableNameFactory pcjFactory = new PcjTableNameFactory(); + + boolean tablesProvided = tables != null && !tables.isEmpty(); + + if (tablesProvided) { + //if tables provided, associate table name with sparql + for (final String table : tables) { + indexTables.put(table, storage.getPcjMetadata(pcjFactory.getPcjId(table)).getSparql()); + } + } else { + //if no tables are provided by user, get ids for rya instance id, create table name, + //and associate table name with sparql + List<String> ids = storage.listPcjs(); + for(String id: ids) { + indexTables.put(pcjFactory.makeTableName(tablePrefix, id), storage.getPcjMetadata(id).getSparql()); + } + } + + //use table name sparql map (indexTables) to create {@link AccumuloIndexSet} + final List<ExternalTupleSet> index = Lists.newArrayList(); + if (indexTables.isEmpty()) { + System.out.println("No Index found"); + } else { + for (final String table : indexTables.keySet()) { + final String indexSparqlString = indexTables.get(table); + index.add(new AccumuloIndexSet(indexSparqlString, conf, table)); + } + } + return index; + } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e4ef9a96/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTableNameFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTableNameFactory.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTableNameFactory.java index 653b4cc..248f724 100644 --- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTableNameFactory.java +++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTableNameFactory.java @@ -18,6 +18,8 @@ */ package org.apache.rya.indexing.pcj.storage.accumulo; +import static java.util.Objects.requireNonNull; + import java.util.UUID; /** @@ -70,4 +72,14 @@ public class PcjTableNameFactory { return makeTableName(tablePrefix, uniqueId); } + /** + * Get the PCJ ID portion of a PCJ table name. + * + * @param pcjTableName - The PCJ table name. (not null) + * @return The PCJ ID that was in the table name. + */ + public String getPcjId(final String pcjTableName) { + requireNonNull(pcjTableName); + return pcjTableName.split("INDEX_")[1]; + } } \ No newline at end of file