Added OPTIONAL support for Precomputed-Joins, including support for matching PCJs with OPTIONALs and evaluation of query plans containing PCJs with OPTIONALs.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/96dd55ec Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/96dd55ec Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/96dd55ec Branch: refs/heads/develop Commit: 96dd55ec505c3e207451702d99a7d296374ff850 Parents: 14073a2 Author: Caleb Meier <[email protected]> Authored: Mon Apr 11 16:23:04 2016 -0400 Committer: Caleb Meier <[email protected]> Committed: Mon May 16 18:29:50 2016 -0400 ---------------------------------------------------------------------- .../iterators/BindingSetHashJoinIterator.java | 324 +++ .../pcj/iterators/IteratorCombiner.java | 107 + .../PCJKeyToCrossProductBindingSetIterator.java | 267 ++ .../PCJKeyToJoinBindingSetIterator.java | 199 ++ .../accumulo/precompQuery/AccumuloPcjQuery.java | 355 --- .../GeneralizedExternalProcessor.java | 2 +- .../IndexedExecutionPlanGenerator.java | 3 +- .../main/java/mvm/rya/indexing/PcjQuery.java | 40 - .../mvm/rya/indexing/accumulo/ConfigUtils.java | 67 +- .../accumulo/entity/AccumuloDocIdIndexer.java | 119 +- .../indexing/external/PrecompJoinOptimizer.java | 883 ------- .../external/QueryVariableNormalizer.java | 1180 --------- .../external/tupleSet/AccumuloIndexSet.java | 732 ++++-- .../external/tupleSet/ExternalTupleSet.java | 43 +- .../tupleSet/SimpleExternalTupleSet.java | 2 - .../pcj/matching/AbstractPCJMatcher.java | 126 + .../pcj/matching/AbstractQuerySegment.java | 122 + .../pcj/matching/FlattenedOptional.java | 331 +++ .../rya/indexing/pcj/matching/JoinSegment.java | 130 + .../pcj/matching/JoinSegmentPCJMatcher.java | 101 + .../pcj/matching/OptionalJoinSegment.java | 146 ++ .../matching/OptionalJoinSegmentPCJMatcher.java | 142 ++ .../rya/indexing/pcj/matching/PCJMatcher.java | 76 + .../pcj/matching/PCJMatcherFactory.java | 73 + .../pcj/matching/PCJNodeConsolidator.java | 628 +++++ .../rya/indexing/pcj/matching/PCJOptimizer.java | 348 +++ .../pcj/matching/PCJOptimizerUtilities.java | 346 +++ .../pcj/matching/QueryNodesToTupleExpr.java | 194 ++ .../rya/indexing/pcj/matching/QuerySegment.java | 83 + .../pcj/matching/QueryVariableNormalizer.java | 1180 +++++++++ .../pcj/matching/TopOfQueryFilterRelocator.java | 97 + .../IndexPlanValidatorTest.java | 58 +- .../ThreshholdPlanSelectorTest.java | 4 +- .../external/AccumuloPcjIntegrationTest.java | 17 +- .../indexing/external/PCJOptionalTestIT.java | 31 +- .../PrecompJoinOptimizerIntegrationTest.java | 211 +- .../external/PrecompJoinOptimizerTest.java | 857 +++---- .../external/PrecompJoinOptimizerTest2.java | 33 +- .../PrecompJoinOptimizerVarToConstTest.java | 16 +- .../external/tupleSet/AccumuloIndexSetTest.java | 2371 +++++++++++++----- .../tupleSet/QueryVariableNormalizerTest.java | 2 +- .../VarConstQueryVariableNormalizerTest.java | 2 +- .../pcj/matching/FlattenedOptionalTest.java | 133 + .../pcj/matching/JoinSegmentPCJMatcherTest.java | 214 ++ .../indexing/pcj/matching/JoinSegmentTest.java | 204 ++ .../OptionalJoinSegmentPCJMatcherTest.java | 274 ++ .../pcj/matching/OptionalJoinSegmentTest.java | 122 + .../pcj/matching/PCJNodeConsolidatorTest.java | 459 ++++ .../indexing/pcj/matching/PCJOptimizerTest.java | 463 ++++ .../pcj/matching/PCJOptimizerUtilitesTest.java | 96 + 50 files changed, 9921 insertions(+), 4092 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/96dd55ec/extras/indexing/src/main/java/mvm/rya/accumulo/pcj/iterators/BindingSetHashJoinIterator.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/accumulo/pcj/iterators/BindingSetHashJoinIterator.java b/extras/indexing/src/main/java/mvm/rya/accumulo/pcj/iterators/BindingSetHashJoinIterator.java new file mode 100644 index 0000000..0966903 --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/accumulo/pcj/iterators/BindingSetHashJoinIterator.java @@ -0,0 +1,324 @@ +package mvm.rya.accumulo.pcj.iterators; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import info.aduna.iteration.CloseableIteration; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NoSuchElementException; +import java.util.Set; + +import mvm.rya.indexing.external.tupleSet.ExternalTupleSet; + +import org.openrdf.query.BindingSet; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; + +import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; + +/** + * This {@link CloseableIteration} performs a hash join by joining each + * {@link Map.Entry<String, BindingSet>} with all corresponding + * {@link BindingSet} in a Multimap with the same String key. + * + */ +public class BindingSetHashJoinIterator implements + CloseableIteration<BindingSet, QueryEvaluationException> { + + //BindingSets passed to PCJ mapped according to values + //associated with common variables with table + private Multimap<String, BindingSet> bindingJoinVarHash; + //BindingSets taken from PCJ table + private CloseableIteration<Map.Entry<String, BindingSet>, QueryEvaluationException> joinIter; + private Iterator<BindingSet> joinedBindingSets = Collections + .emptyIterator(); + //If PCJ contains LeftJoin, this is a set of variable in LeftJoin. Used when performing Join. + private Set<String> unAssuredVariables; + //indicates when HashJoin formed from a single collection of join variable or if the size and + //collection of join variables varies -- this is to optimize the join process + private HashJoinType type; + private final BindingSet EMPTY_BINDINGSET = new QueryBindingSet(); + private BindingSet next; + private boolean hasNextCalled = false; + private boolean isEmpty = false; + + /** + * Enum type to indicate whether HashJoin will be performed over a fixed + * subset of variables common to each {@link BindingSet}, or if there is a + * collection of variable subsets over which to join. + * + */ + public enum HashJoinType { + CONSTANT_JOIN_VAR, VARIABLE_JOIN_VAR + }; + + public BindingSetHashJoinIterator( + Multimap<String, BindingSet> bindingJoinVarHash, + CloseableIteration<Map.Entry<String, BindingSet>, QueryEvaluationException> joinIter, + Set<String> unAssuredVariables, HashJoinType type) { + this.bindingJoinVarHash = bindingJoinVarHash; + this.joinIter = joinIter; + this.type = type; + this.unAssuredVariables = unAssuredVariables; + } + + @Override + public boolean hasNext() throws QueryEvaluationException { + if (!hasNextCalled && !isEmpty) { + while (joinedBindingSets.hasNext() || joinIter.hasNext()) { + if (!joinedBindingSets.hasNext()) { + Entry<String, BindingSet> entry = joinIter.next(); + joinedBindingSets = joinBindingSetEntry(entry); + } + if (!joinedBindingSets.hasNext()) { + continue; + } + next = joinedBindingSets.next(); + hasNextCalled = true; + return true; + } + + isEmpty = true; + return false; + } else if (isEmpty) { + return false; + } else { + return true; + } + } + + @Override + public BindingSet next() throws QueryEvaluationException { + if (hasNextCalled) { + hasNextCalled = false; + } else if (isEmpty) { + throw new NoSuchElementException(); + } else { + if (this.hasNext()) { + hasNextCalled = false; + } else { + throw new NoSuchElementException(); + } + } + return next; + } + + @Override + public void remove() throws QueryEvaluationException { + throw new UnsupportedOperationException(); + } + + @Override + public void close() throws QueryEvaluationException { + joinIter.close(); + } + + /** + * This method takes the valOrderString, which is a key used for computing + * hash joins, and generates multiple keys by pulling off one delimiter + * separated component at a time. This is used when the size of the join key + * varies from {@link Map.Entry} to Entry. It allows the BindingSet to be + * joined using all prefixes of the key. + * + * @param valOrderString + * - key used for hash join + * @return + */ + private List<String> getValueOrders(String valOrderString) { + + List<String> valueOrders = new ArrayList<>(); + String[] splitValOrderString = valOrderString + .split(ExternalTupleSet.VALUE_DELIM); + StringBuffer buffer = new StringBuffer(); + buffer.append(splitValOrderString[0]); + valueOrders.add(buffer.substring(0)); + + for (int i = 1; i < splitValOrderString.length; i++) { + buffer.append(ExternalTupleSet.VALUE_DELIM + splitValOrderString[i]); + valueOrders.add(buffer.substring(0)); + } + + return valueOrders; + } + + /** + * This method verifies that all common variables have a common value and + * then joins the BindingSets together. In the case that the PCJ contains a + * LeftJoin, if the leftBs and rightBs have a common variable with distinct + * values and that common variable is unassured (only appears in LeftJoin), + * this method uses the value corresponding to leftBs. + * + * @param leftBs + * - BindingSet passed into PCJ + * @param rightBs + * - PCJ BindingSet + * @return - joined BindingSet + */ + private BindingSet joinBindingSets(BindingSet leftBs, BindingSet rightBs) { + + Set<String> commonVars = Sets.intersection(leftBs.getBindingNames(), + rightBs.getBindingNames()); + // compare values associated with common variables to make sure + // BindingSets can be joined. Possible for leftBs and rightBs + // to have a common unAssuredVariable in event PCJ contains LeftJoin. + // if values corresponding to common unAssuredVariable do not agree + // add value corresponding to leftBs + for (String s : commonVars) { + if (!leftBs.getValue(s).equals(rightBs.getValue(s)) + && !unAssuredVariables.contains(s)) { + return EMPTY_BINDINGSET; + } + } + QueryBindingSet bs = new QueryBindingSet(removeConstants(leftBs)); + + rightBs = removeConstants(rightBs); + // only add Bindings corresponding to variables that have no value + // assigned. This takes into account case where leftBs and rightBs + // share a common, unAssuredVariable. In this case, use value + // corresponding + // to leftBs, which is effectively performing a LeftJoin. + for (String s : rightBs.getBindingNames()) { + if (bs.getValue(s) == null) { + bs.addBinding(s, rightBs.getValue(s)); + } + } + + return bs; + } + + private BindingSet removeConstants(BindingSet bs) { + QueryBindingSet bSet = new QueryBindingSet(); + for (String s : bs.getBindingNames()) { + if (!s.startsWith(ExternalTupleSet.CONST_PREFIX)) { + bSet.addBinding(bs.getBinding(s)); + } + } + return bSet; + } + + /** + * This method returns an Iterator which joins the given Entry's BindingSet + * to all BindingSets which matching the Entry's key. + * + * @param entry - entry to be joined + * @return - Iterator over joined BindingSets + */ + private Iterator<BindingSet> joinBindingSetEntry( + Map.Entry<String, BindingSet> entry) { + + List<Collection<BindingSet>> matches = new ArrayList<>(); + if (type == HashJoinType.CONSTANT_JOIN_VAR) { + if (bindingJoinVarHash.containsKey(entry.getKey())) { + matches.add(bindingJoinVarHash.get(entry.getKey())); + } + } else { + List<String> valOrders = getValueOrders(entry.getKey()); + for (String s : valOrders) { + if (bindingJoinVarHash.containsKey(s)) { + matches.add(bindingJoinVarHash.get(s)); + } + } + } + + if (matches.size() == 0) { + return Collections.emptyIterator(); + } else { + return new BindingSetCollectionsJoinIterator(entry.getValue(), + matches); + } + + } + + /** + * Given a BindingSet and a List of Collections of BindingSets, this + * Iterator joins the BindingSet with the BindingSets in each Collection + * + */ + private class BindingSetCollectionsJoinIterator implements + Iterator<BindingSet> { + + private Iterator<Collection<BindingSet>> collectionIter; + private Iterator<BindingSet> bsIter = Collections.emptyIterator(); + private BindingSet next; + private BindingSet joinBs; + private boolean hasNextCalled = false; + private boolean isEmpty = false; + + public BindingSetCollectionsJoinIterator(BindingSet bs, + List<Collection<BindingSet>> collection) { + this.collectionIter = collection.iterator(); + this.joinBs = bs; + } + + @Override + public boolean hasNext() { + + if (!hasNextCalled && !isEmpty) { + while (bsIter.hasNext() || collectionIter.hasNext()) { + if (!bsIter.hasNext()) { + bsIter = collectionIter.next().iterator(); + } + next = joinBindingSets(bsIter.next(), joinBs); + if (next == EMPTY_BINDINGSET) { + continue; + } + hasNextCalled = true; + return true; + } + isEmpty = true; + return false; + } else if (isEmpty) { + return false; + } else { + return true; + } + } + + @Override + public BindingSet next() { + if (hasNextCalled) { + hasNextCalled = false; + } else if (isEmpty) { + throw new NoSuchElementException(); + } else { + if (this.hasNext()) { + hasNextCalled = false; + } else { + throw new NoSuchElementException(); + } + } + return next; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/96dd55ec/extras/indexing/src/main/java/mvm/rya/accumulo/pcj/iterators/IteratorCombiner.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/accumulo/pcj/iterators/IteratorCombiner.java b/extras/indexing/src/main/java/mvm/rya/accumulo/pcj/iterators/IteratorCombiner.java new file mode 100644 index 0000000..2407865 --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/accumulo/pcj/iterators/IteratorCombiner.java @@ -0,0 +1,107 @@ +package mvm.rya.accumulo.pcj.iterators; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import info.aduna.iteration.CloseableIteration; + +import java.util.Collection; +import java.util.Iterator; +import java.util.NoSuchElementException; + +import org.openrdf.query.BindingSet; +import org.openrdf.query.QueryEvaluationException; + +import com.google.common.base.Preconditions; + +/** + * This {@link CloseableIteration} takes in a list of CloseableIterations + * and merges them together into a single CloseableIteration. + * + */ +public class IteratorCombiner implements + CloseableIteration<BindingSet, QueryEvaluationException> { + + + private Collection<CloseableIteration<BindingSet, QueryEvaluationException>> iterators; + private Iterator<CloseableIteration<BindingSet, QueryEvaluationException>> iteratorIterator; + private CloseableIteration<BindingSet, QueryEvaluationException> currIter; + private boolean isEmpty = false; + private boolean hasNextCalled = false; + private BindingSet next; + + public IteratorCombiner(Collection<CloseableIteration<BindingSet, QueryEvaluationException>> iterators) { + Preconditions.checkArgument(iterators.size() > 0); + this.iterators = iterators; + iteratorIterator = iterators.iterator(); + currIter = iteratorIterator.next(); + } + + @Override + public boolean hasNext() throws QueryEvaluationException { + if (!hasNextCalled && !isEmpty) { + while (currIter.hasNext() || iteratorIterator.hasNext()) { + if(!currIter.hasNext()) { + currIter = iteratorIterator.next(); + } + if(!currIter.hasNext()) { + continue; + } + next = currIter.next(); + hasNextCalled = true; + return true; + } + isEmpty = true; + return false; + } else if (isEmpty) { + return false; + } else { + return true; + } + } + + @Override + public BindingSet next() throws QueryEvaluationException { + if (hasNextCalled) { + hasNextCalled = false; + } else if (isEmpty) { + throw new NoSuchElementException(); + } else { + if (this.hasNext()) { + hasNextCalled = false; + } else { + throw new NoSuchElementException(); + } + } + return next; + } + + @Override + public void remove() throws QueryEvaluationException { + throw new UnsupportedOperationException(); + } + + @Override + public void close() throws QueryEvaluationException { + for(CloseableIteration<BindingSet, QueryEvaluationException> iterator: iterators) { + iterator.close(); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/96dd55ec/extras/indexing/src/main/java/mvm/rya/accumulo/pcj/iterators/PCJKeyToCrossProductBindingSetIterator.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/accumulo/pcj/iterators/PCJKeyToCrossProductBindingSetIterator.java b/extras/indexing/src/main/java/mvm/rya/accumulo/pcj/iterators/PCJKeyToCrossProductBindingSetIterator.java new file mode 100644 index 0000000..0c7369c --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/accumulo/pcj/iterators/PCJKeyToCrossProductBindingSetIterator.java @@ -0,0 +1,267 @@ +package mvm.rya.accumulo.pcj.iterators; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import info.aduna.iteration.CloseableIteration; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Set; + +import mvm.rya.indexing.external.tupleSet.ExternalTupleSet; + +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.data.Key; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjSerializer; +import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.openrdf.model.Value; +import org.openrdf.query.BindingSet; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; + +import com.google.common.collect.HashBiMap; + +/** + * This class takes in a {@link Scanner} and a Collection of BindingSets, + * deserializes each {@link Map.Entry<Key,Value>} taken from the Scanner into + * a {@link BindingSet}, and performs a cross product on the BindingSet with + * each BindingSet in the provided Collection. The user can also specify a + * {@link Map<String, Value>} of constant constraints that can be used to filter. + * + */ +public class PCJKeyToCrossProductBindingSetIterator implements + CloseableIteration<BindingSet, QueryEvaluationException> { + + //BindingSets passed to PCJ used to form cross product + private List<BindingSet> crossProductBs; + //Scanner over PCJ table + private Scanner scanner; + //Iterator over PCJ scanner + private Iterator<Map.Entry<Key, org.apache.accumulo.core.data.Value>> iterator; + //Map of PCJ variables in table to variable in query + private Map<String, String> pcjVarMap; + //if PCJ contains LeftJoin, this is a set of variables that only appear in + //LeftJoin. Used when performing the cross product. + private Set<String> unAssuredVariables; + private final AccumuloPcjSerializer converter = new AccumuloPcjSerializer(); + private final BindingSet EMPTY_BINDINGSET = new QueryBindingSet(); + private Iterator<BindingSet> crossProductIter = Collections.emptyIterator(); + private Map<String, Value> constantConstraints; + private BindingSet next; + private boolean hasNextCalled = false; + private boolean isEmpty = false; + private boolean crossProductBsExist = false; + private boolean constantConstraintsExist = false; + + public PCJKeyToCrossProductBindingSetIterator(Scanner scanner, + List<BindingSet> crossProductBs, + Map<String, Value> constantConstraints, Set<String> unAssuredVariables, + Map<String, String> pcjVarMap) { + this.crossProductBs = crossProductBs; + this.scanner = scanner; + this.iterator = scanner.iterator(); + this.pcjVarMap = HashBiMap.create(pcjVarMap).inverse(); + this.constantConstraints = constantConstraints; + this.crossProductBsExist = crossProductBs.size() > 0; + this.constantConstraintsExist = constantConstraints.size() > 0; + this.unAssuredVariables = unAssuredVariables; + } + + @Override + public boolean hasNext() throws QueryEvaluationException { + if (!hasNextCalled && !isEmpty) { + if (crossProductBsExist) { + while (crossProductIter.hasNext() || iterator.hasNext()) { + if (!crossProductIter.hasNext()) { + Key key = iterator.next().getKey(); + try { + crossProductIter = getCrossProducts(getBindingSet(key)); + } catch (BindingSetConversionException e) { + throw new QueryEvaluationException(e); + } + } + if (!crossProductIter.hasNext()) { + continue; + } + next = crossProductIter.next(); + hasNextCalled = true; + return true; + } + } else { + while (iterator.hasNext()) { + Key key = iterator.next().getKey(); + try { + next = getBindingSet(key); + } catch (BindingSetConversionException e) { + throw new QueryEvaluationException(e); + } + //BindingSet cannot be deserialized or is filtered + //out by constant constraints + if (next == null || next == EMPTY_BINDINGSET) { + continue; + } + hasNextCalled = true; + return true; + } + } + isEmpty = true; + return false; + } else if (isEmpty) { + return false; + } else { + return true; + } + } + + @Override + public BindingSet next() throws QueryEvaluationException { + if (hasNextCalled) { + hasNextCalled = false; + } else if (isEmpty) { + throw new NoSuchElementException(); + } else { + if (this.hasNext()) { + hasNextCalled = false; + } else { + throw new NoSuchElementException(); + } + } + return next; + } + + @Override + public void remove() throws QueryEvaluationException { + throw new UnsupportedOperationException(); + } + + @Override + public void close() throws QueryEvaluationException { + scanner.close(); + } + + /** + * + * @param key + * - Accumulo key obtained from scan + * @return - BindingSet satisfying any specified constant constraints + * @throws BindingSetConversionException + * @throws QueryEvaluationException + */ + private BindingSet getBindingSet(Key key) + throws BindingSetConversionException, QueryEvaluationException { + byte[] row = key.getRow().getBytes(); + String[] varOrder = key.getColumnFamily().toString() + .split(ExternalTupleSet.VAR_ORDER_DELIM); + + BindingSet bindingSet = converter.convert(row, new VariableOrder( + varOrder)); + + QueryBindingSet bs = new QueryBindingSet(); + for (String var : bindingSet.getBindingNames()) { + String mappedVar = null; + if(pcjVarMap.containsKey(var)) { + mappedVar = pcjVarMap.get(var); + } else { + throw new QueryEvaluationException("PCJ Variable has no mapping to query variable."); + } + if (constantConstraintsExist) { + if (mappedVar.startsWith(ExternalTupleSet.CONST_PREFIX) + && constantConstraints.containsKey(mappedVar) + && !constantConstraints.get(mappedVar).equals( + bindingSet.getValue(var))) { + return EMPTY_BINDINGSET; + } + } + + if (!mappedVar.startsWith(ExternalTupleSet.CONST_PREFIX)) { + bs.addBinding(mappedVar, bindingSet.getValue(var)); + } + } + return bs; + } + + /** + * This method forms the cross-product between an input BindingSet and the + * BindingSets contained in crossProdcutBs. + * + * @param bs + * - {@link BindingSet} used to form cross product with + * cross-product BindingSets + * @return - Iterator over resulting cross-product + */ + private Iterator<BindingSet> getCrossProducts(BindingSet bs) { + Set<BindingSet> crossProducts = new HashSet<BindingSet>(); + + for (BindingSet bSet : crossProductBs) { + BindingSet prod = takeCrossProduct(bSet, bs); + if (prod != EMPTY_BINDINGSET) { + crossProducts.add(prod); + } + } + + return crossProducts.iterator(); + + } + + /** + * This method compute the cross product of the BindingSet passed to the PCJ + * and the PCJ BindingSet. It verifies that only common variables are unassured + * variables, and if leftBs and rightBs have distinct values for a given variable, + * this method uses the value from leftBs in the cross product BindingSet - this + * is effectively performing a LeftJoin. + * + * @param leftBs - BindingSet passed to PCJ + * @param rightBs - PCJ BindingSet + * @return - cross product BindingSet + */ + private BindingSet takeCrossProduct(BindingSet leftBs, BindingSet rightBs) { + if (bindingSetsIntersect(leftBs, rightBs)) { + return EMPTY_BINDINGSET; + } + QueryBindingSet bs = new QueryBindingSet(leftBs); + + //only add Bindings corresponding to variables that have no value + //assigned. This takes into account case where leftBs and rightBs + //share a common, unAssuredVariable. In this case, use value corresponding + //to leftBs, which is effectively performing a LeftJoin. + for(String s: rightBs.getBindingNames()) { + if(bs.getValue(s) == null) { + bs.addBinding(s, rightBs.getValue(s)); + } + } + return bs; + } + + private boolean bindingSetsIntersect(BindingSet bs1, BindingSet bs2) { + + for(String s: bs1.getBindingNames()) { + if(bs2.getValue(s) != null && !unAssuredVariables.contains(s)) { + return true; + } + } + return false; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/96dd55ec/extras/indexing/src/main/java/mvm/rya/accumulo/pcj/iterators/PCJKeyToJoinBindingSetIterator.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/accumulo/pcj/iterators/PCJKeyToJoinBindingSetIterator.java b/extras/indexing/src/main/java/mvm/rya/accumulo/pcj/iterators/PCJKeyToJoinBindingSetIterator.java new file mode 100644 index 0000000..1b821d4 --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/accumulo/pcj/iterators/PCJKeyToJoinBindingSetIterator.java @@ -0,0 +1,199 @@ +package mvm.rya.accumulo.pcj.iterators; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import info.aduna.iteration.CloseableIteration; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NoSuchElementException; + +import mvm.rya.api.RdfCloudTripleStoreUtils; +import mvm.rya.indexing.external.tupleSet.ExternalTupleSet; + +import org.apache.accumulo.core.client.BatchScanner; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.data.Key; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjSerializer; +import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.openrdf.model.Value; +import org.openrdf.query.BindingSet; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; + +import com.google.common.base.Preconditions; +import com.google.common.collect.HashBiMap; + +/** + * This class takes in a {@link Scanner} and a Collection of BindingSets, + * deserializes each {@link Map.Entry<Key,Value>} taken from the Scanner into a + * {@link BindingSet}, and creates a {@link Map.Entry<String, BindingSet>} + * object to perform as hash join. The user can also specify a {@link Map + * <String, Value>} of constant constraints that can be used to filter. + * + */ +public class PCJKeyToJoinBindingSetIterator + implements + CloseableIteration<Map.Entry<String, BindingSet>, QueryEvaluationException> { + + //map of variables as they appear in PCJ table to query variables + private Map<String, String> pcjVarMap; + //constant constraints used for filtering + private Map<String, Value> constantConstraints; + //max number of variables an entry in the batch of BindingSets had in common with PCJ + //this is used for constructing hash join key. + private int maxPrefixLen; + private static final AccumuloPcjSerializer converter = new AccumuloPcjSerializer(); + private final Map.Entry<String, BindingSet> EMPTY_ENTRY = new RdfCloudTripleStoreUtils.CustomEntry<String, BindingSet>( + "", new QueryBindingSet()); + private Iterator<Entry<Key, org.apache.accumulo.core.data.Value>> iterator; + private boolean hasNextCalled = false; + private boolean isEmpty = false; + private Map.Entry<String, BindingSet> next; + private BatchScanner scanner; + + public PCJKeyToJoinBindingSetIterator(BatchScanner scanner, + Map<String, String> pcjVarMap, + Map<String, Value> constantConstraints, int maxPrefixLen) { + Preconditions.checkNotNull(scanner); + Preconditions.checkArgument(pcjVarMap.size() > 0, + "Variable map must contain at least one variable!"); + Preconditions.checkNotNull(constantConstraints, + "Constant constraints cannot be null."); + Preconditions.checkArgument(maxPrefixLen > 0, + "Max prefix length must be greater than 0."); + Preconditions + .checkArgument(maxPrefixLen <= pcjVarMap.size(), + "Max prefix length must be less than total number of binding names."); + this.scanner = scanner; + this.pcjVarMap = HashBiMap.create(pcjVarMap).inverse(); + this.constantConstraints = constantConstraints; + this.maxPrefixLen = maxPrefixLen; + this.iterator = scanner.iterator(); + + } + + public PCJKeyToJoinBindingSetIterator(BatchScanner scanner, + Map<String, String> pcjVarMap, int maxPrefixLen) { + this(scanner, pcjVarMap, new HashMap<String, Value>(), maxPrefixLen); + } + + @Override + public boolean hasNext() throws QueryEvaluationException { + + if (!hasNextCalled && !isEmpty) { + while (iterator.hasNext()) { + Key key = iterator.next().getKey(); + // get bindings from scan without values associated with + // constant constraints + try { + next = getBindingSetEntryAndMatchConstants(key); + } catch (BindingSetConversionException e) { + throw new QueryEvaluationException( + "Could not deserialize PCJ BindingSet."); + } + // skip key if constant constraint don't match + if (next == EMPTY_ENTRY) { + continue; + } + hasNextCalled = true; + return true; + } + isEmpty = true; + return false; + } else if (isEmpty) { + return false; + } else { + return true; + } + } + + @Override + public Entry<String, BindingSet> next() throws QueryEvaluationException { + if (hasNextCalled) { + hasNextCalled = false; + } else if (isEmpty) { + throw new NoSuchElementException(); + } else { + if (this.hasNext()) { + hasNextCalled = false; + } else { + throw new NoSuchElementException(); + } + } + return next; + } + + @Override + public void remove() throws QueryEvaluationException { + throw new UnsupportedOperationException(); + } + + @Override + public void close() throws QueryEvaluationException { + scanner.close(); + } + + /** + * + * @param key + * - Accumulo key obtained from scan + * @return - Entry<String,BindingSet> satisfying the constant constraints + * @throws BindingSetConversionException + */ + private Map.Entry<String, BindingSet> getBindingSetEntryAndMatchConstants( + Key key) throws BindingSetConversionException { + byte[] row = key.getRow().getBytes(); + String[] varOrder = key.getColumnFamily().toString() + .split(ExternalTupleSet.VAR_ORDER_DELIM); + + BindingSet bindingSet = converter.convert(row, new VariableOrder( + varOrder)); + + QueryBindingSet bs = new QueryBindingSet(); + for (String var : bindingSet.getBindingNames()) { + String mappedVar = pcjVarMap.get(var); + if (mappedVar.startsWith(ExternalTupleSet.CONST_PREFIX) + && constantConstraints.containsKey(mappedVar) + && !constantConstraints.get(mappedVar).equals( + bindingSet.getValue(var))) { + return EMPTY_ENTRY; + } else { + bs.addBinding(mappedVar, bindingSet.getValue(var)); + } + } + + String orderedValueString = bindingSet.getValue(varOrder[0]).toString(); + for (int i = 1; i < maxPrefixLen; i++) { + Value value = bindingSet.getValue(varOrder[i]); + if (value != null) { + orderedValueString = orderedValueString + + ExternalTupleSet.VALUE_DELIM + value.toString(); + } + } + + return new RdfCloudTripleStoreUtils.CustomEntry<String, BindingSet>( + orderedValueString, bs); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/96dd55ec/extras/indexing/src/main/java/mvm/rya/accumulo/precompQuery/AccumuloPcjQuery.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/accumulo/precompQuery/AccumuloPcjQuery.java b/extras/indexing/src/main/java/mvm/rya/accumulo/precompQuery/AccumuloPcjQuery.java deleted file mode 100644 index f6b7819..0000000 --- a/extras/indexing/src/main/java/mvm/rya/accumulo/precompQuery/AccumuloPcjQuery.java +++ /dev/null @@ -1,355 +0,0 @@ -package mvm.rya.accumulo.precompQuery; - -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.NoSuchElementException; -import java.util.Set; - -import org.apache.accumulo.core.client.BatchScanner; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.hadoop.io.Text; -import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjSerializer; -import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException; -import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; -import org.openrdf.query.BindingSet; -import org.openrdf.query.QueryEvaluationException; -import org.openrdf.query.algebra.evaluation.QueryBindingSet; - -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * - */ -import info.aduna.iteration.CloseableIteration; -import info.aduna.iteration.Iteration; -import mvm.rya.api.resolver.RyaTypeResolverException; -import mvm.rya.indexing.PcjQuery; -import mvm.rya.indexing.external.tupleSet.AccumuloIndexSet; -import mvm.rya.indexing.external.tupleSet.ExternalTupleSet; - -/** - * This class encapsulates how pre-computed join tables are used during query - * evaluation. The method - * {@link AccumuloPcjQuery#queryPrecompJoin(List, String, Map, Map, Collection) - * is used by {@link AccumuloIndexSet#evaluate(BindingSet)} to evaluate the - * {@link AccumuloIndexSet#getTupleExpr()} associated with the Accumulo - * pre-computed join table. Given the {@link BindingSet} constraints, it uses - * the variables common to the BindingSet constraints and the pre-computed join - * table TupleExpr to build a {@Range} prefix to scan the pre-computed - * join table to obtain results for the constrained sub-query. - * - */ -public class AccumuloPcjQuery implements PcjQuery { - private static final AccumuloPcjSerializer converter = new AccumuloPcjSerializer(); - - private final Connector accCon; - private final String tableName; - - public AccumuloPcjQuery(Connector accCon, String tableName) { - this.accCon = accCon; - this.tableName = tableName; - } - - /** - * @param commonVars - variables common to bsConstraints and table in terms of query variables - * @param localityGroup - the column family to scan in terms of table variables - * @param valMap - Literal type associated with constant constraints - * @param varMap - map query variables to pre-computed join table variables - * @param bsConstraints - binding set constraints - * @return {@link Iteration} over result BindingSets - */ - @Override - public CloseableIteration<BindingSet, QueryEvaluationException> queryPrecompJoin( - List<String> commonVars, String localityGroup, - Map<String, org.openrdf.model.Value> valMap, - Map<String, String> varMap, Collection<BindingSet> bsConstraints) - throws QueryEvaluationException, TableNotFoundException { - - final Iterator<Entry<Key, Value>> accIter; - final Map<String, org.openrdf.model.Value> constValMap = valMap; - final HashMultimap<Range, BindingSet> map = HashMultimap.create(); - - final List<BindingSet> extProdList = Lists.newArrayList(); - final List<String> prefixVars = commonVars; - final BatchScanner bs = accCon.createBatchScanner(tableName, - new Authorizations(), 10); - final Set<Range> ranges = Sets.newHashSet(); - final Map<String, String> tableVarMap = varMap; - final boolean bsContainsPrefixVar = bindingsContainsPrefixVar( - bsConstraints, prefixVars); - - bs.fetchColumnFamily(new Text(localityGroup)); - // process bindingSet and constant constraints - for (final BindingSet bSet : bsConstraints) { - // bindings sets and PCJ have common vars - if (bsContainsPrefixVar) { - byte[] rangePrefix = null; - final QueryBindingSet rangeBs = new QueryBindingSet(); - for (final String var : prefixVars) { - if (var.startsWith(ExternalTupleSet.CONST_PREFIX)) { - rangeBs.addBinding(var, constValMap.get(var)); - } else { - rangeBs.addBinding(var, bSet.getBinding(var).getValue()); - } - } - try { - rangePrefix = converter.convert(rangeBs, new VariableOrder(commonVars)); - } catch (final BindingSetConversionException e) { - throw new QueryEvaluationException(e); - } - final Range r = Range.prefix(new Text(rangePrefix)); - map.put(r, bSet); - ranges.add(r); - // non-empty binding sets and no common vars with no constant - // constraints - } else if (bSet.size() > 0 && prefixVars.size() == 0) { - extProdList.add(bSet); - } - } - - // constant constraints and no bindingSet constraints - // add range of entire table if no constant constraints and - // bsConstraints consists of single, empty set (occurs when AIS is - // first node evaluated in query) - if (ranges.isEmpty()) { - // constant constraints - if (prefixVars.size() > 0) { - byte[] rangePrefix = null; - final QueryBindingSet rangeBs = new QueryBindingSet(); - for (final String var : prefixVars) { - if (var.startsWith(ExternalTupleSet.CONST_PREFIX)) { - rangeBs.addBinding(var, constValMap.get(var)); - } - } - try { - rangePrefix = converter.convert(rangeBs, new VariableOrder(commonVars)); - } catch (final BindingSetConversionException e) { - throw new QueryEvaluationException(e); - } - final Range r = Range.prefix(new Text(rangePrefix)); - ranges.add(r); - } - // no constant or bindingSet constraints - else { - ranges.add(new Range("", true, "~", false)); - } - } - - if (ranges.size() == 0) { - accIter = null; - } else { - bs.setRanges(ranges); - accIter = bs.iterator(); - } - return new CloseableIteration<BindingSet, QueryEvaluationException>() { - @Override - public void remove() throws QueryEvaluationException { - throw new UnsupportedOperationException(); - } - - private Iterator<BindingSet> inputSet = null; - private QueryBindingSet currentSolutionBs = null; - private boolean hasNextCalled = false; - private boolean isEmpty = false; - - @Override - public BindingSet next() throws QueryEvaluationException { - final QueryBindingSet bs = new QueryBindingSet(); - if (hasNextCalled) { - hasNextCalled = false; - if (inputSet != null) { - bs.addAll(inputSet.next()); - } - bs.addAll(currentSolutionBs); - } else if (isEmpty) { - throw new NoSuchElementException(); - } else { - if (this.hasNext()) { - hasNextCalled = false; - if (inputSet != null) { - bs.addAll(inputSet.next()); - } - bs.addAll(currentSolutionBs); - } else { - throw new NoSuchElementException(); - } - } - return bs; - } - - @Override - public boolean hasNext() throws QueryEvaluationException { - if (accIter == null) { - isEmpty = true; - return false; - } - if (!hasNextCalled && !isEmpty) { - while (accIter.hasNext() || inputSet != null - && inputSet.hasNext()) { - if (inputSet != null && inputSet.hasNext()) { - hasNextCalled = true; - return true; - } - final Key k = accIter.next().getKey(); - // get bindings from scan without values associated with - // constant constraints - BindingSet bs; - try { - bs = getBindingSetWithoutConstants(k, tableVarMap); - } catch (final BindingSetConversionException e) { - throw new QueryEvaluationException(e); - } - currentSolutionBs = new QueryBindingSet(); - currentSolutionBs.addAll(bs); - - // check to see if additional bindingSet constraints - // exist in map - if (map.size() > 0) { - // get prefix range to retrieve remainder of - // bindingSet from map - byte[] rangePrefix; - try { - rangePrefix = getPrefixByte(bs, constValMap, - prefixVars); - } catch (final BindingSetConversionException e) { - throw new QueryEvaluationException(e); - } - final Range r = Range.prefix(new Text(rangePrefix)); - inputSet = map.get(r).iterator(); - if (!inputSet.hasNext()) { - continue; - } else { - hasNextCalled = true; - return true; - } - // check to see if binding set constraints exist, - // but no common vars - } else if (extProdList.size() > 0) { - inputSet = extProdList.iterator(); - hasNextCalled = true; - return true; - } - // no bindingsSet constraints--only constant constraints - // or none - else { - hasNextCalled = true; - return true; - } - } - isEmpty = true; - return false; - } else if (isEmpty) { - return false; - } else { - return true; - } - } - - @Override - public void close() throws QueryEvaluationException { - bs.close(); - } - }; - } - - /** - * - * @param bindingSets - binding set constraints - * @param prefixVars - common prefix variables to table and any constant constraints - * @return true if there are variables common to binding sets and table and false - * if prefixVars only consists of constant constraints - */ - private boolean bindingsContainsPrefixVar( - Collection<BindingSet> bindingSets, List<String> prefixVars) { - final Iterator<BindingSet> iter = bindingSets.iterator(); - if (iter.hasNext()) { - final BindingSet tempBindingSet = iter.next(); - final Set<String> bindings = tempBindingSet.getBindingNames(); - for (final String var : prefixVars) { - if (bindings.contains(var)) { - return true; - } - } - } - return false; - } - - /** - * - * @param bs - binding set from which byte is extracted - * @param valMap - map which associated Value type to constant constraint - * @param prefixVars - prefix of variables common to binding sets and table and constant constraints - * @return - bytes associated with values in bs that are associated with prefixVars - * @throws RyaTypeResolverException - */ - private static byte[] getPrefixByte(BindingSet bs, - Map<String, org.openrdf.model.Value> valMap, List<String> prefixVars) - throws BindingSetConversionException { - final QueryBindingSet bSet = new QueryBindingSet(); - for (final String var : prefixVars) { - if (var.startsWith(ExternalTupleSet.CONST_PREFIX)) { - bSet.addBinding(var, valMap.get(var)); - } else if (bs.getBindingNames().size() > 0 - && bs.getBinding(var) != null) { - bSet.addBinding(var, bs.getBinding(var).getValue()); - } - } - - return converter.convert(bSet, new VariableOrder(prefixVars)); - } - - /** - * - * @param key - Accumulo key obtained from scan - * @param tableVarMap - map that associated query variables and table variables - * @return - BindingSet without values associated with constant constraints - * @throws BindingSetConversionException - */ - private static BindingSet getBindingSetWithoutConstants(Key key, - Map<String, String> tableVarMap) throws BindingSetConversionException { - final byte[] row = key.getRow().getBytes(); - final String[] varOrder = key.getColumnFamily().toString() - .split(ExternalTupleSet.VAR_ORDER_DELIM); - - BindingSet bindingSet = converter.convert(row, new VariableOrder(varOrder)); - final QueryBindingSet temp = new QueryBindingSet(bindingSet); - - final QueryBindingSet bs = new QueryBindingSet(); - for (final String var : temp.getBindingNames()) { - if (!tableVarMap.get(var).startsWith(ExternalTupleSet.CONST_PREFIX)) { - bs.addBinding(tableVarMap.get(var), temp.getValue(var)); - } - } - return bs; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/96dd55ec/extras/indexing/src/main/java/mvm/rya/indexing/IndexPlanValidator/GeneralizedExternalProcessor.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/IndexPlanValidator/GeneralizedExternalProcessor.java b/extras/indexing/src/main/java/mvm/rya/indexing/IndexPlanValidator/GeneralizedExternalProcessor.java index 58e84d9..08d52ed 100644 --- a/extras/indexing/src/main/java/mvm/rya/indexing/IndexPlanValidator/GeneralizedExternalProcessor.java +++ b/extras/indexing/src/main/java/mvm/rya/indexing/IndexPlanValidator/GeneralizedExternalProcessor.java @@ -28,8 +28,8 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -import mvm.rya.indexing.external.QueryVariableNormalizer.VarCollector; import mvm.rya.indexing.external.tupleSet.ExternalTupleSet; +import mvm.rya.indexing.pcj.matching.QueryVariableNormalizer.VarCollector; import org.openrdf.query.algebra.BindingSetAssignment; import org.openrdf.query.algebra.Filter; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/96dd55ec/extras/indexing/src/main/java/mvm/rya/indexing/IndexPlanValidator/IndexedExecutionPlanGenerator.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/IndexPlanValidator/IndexedExecutionPlanGenerator.java b/extras/indexing/src/main/java/mvm/rya/indexing/IndexPlanValidator/IndexedExecutionPlanGenerator.java index a0fca34..22b1e85 100644 --- a/extras/indexing/src/main/java/mvm/rya/indexing/IndexPlanValidator/IndexedExecutionPlanGenerator.java +++ b/extras/indexing/src/main/java/mvm/rya/indexing/IndexPlanValidator/IndexedExecutionPlanGenerator.java @@ -24,8 +24,8 @@ import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; -import mvm.rya.indexing.external.QueryVariableNormalizer; import mvm.rya.indexing.external.tupleSet.ExternalTupleSet; +import mvm.rya.indexing.pcj.matching.QueryVariableNormalizer; import org.openrdf.query.algebra.Projection; import org.openrdf.query.algebra.TupleExpr; @@ -113,7 +113,6 @@ public class IndexedExecutionPlanGenerator implements ExternalIndexMatcher { try { tupList = QueryVariableNormalizer.getNormalizedIndex(query, e.getTupleExpr()); } catch (final Exception e1) { - // TODO Auto-generated catch block e1.printStackTrace(); } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/96dd55ec/extras/indexing/src/main/java/mvm/rya/indexing/PcjQuery.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/PcjQuery.java b/extras/indexing/src/main/java/mvm/rya/indexing/PcjQuery.java deleted file mode 100644 index 57f77cf..0000000 --- a/extras/indexing/src/main/java/mvm/rya/indexing/PcjQuery.java +++ /dev/null @@ -1,40 +0,0 @@ -package mvm.rya.indexing; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -import info.aduna.iteration.CloseableIteration; - -import java.util.Collection; -import java.util.List; -import java.util.Map; - -import org.apache.accumulo.core.client.TableNotFoundException; -import org.openrdf.model.Value; -import org.openrdf.query.BindingSet; -import org.openrdf.query.QueryEvaluationException; - -public interface PcjQuery { - - public abstract CloseableIteration<BindingSet, QueryEvaluationException> queryPrecompJoin(List<String> varOrder, - String localityGroup, Map<String, Value> valMap, Map<String, String> varMap, Collection<BindingSet> constraints) throws QueryEvaluationException,TableNotFoundException; - -} - http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/96dd55ec/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/ConfigUtils.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/ConfigUtils.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/ConfigUtils.java index 6c87182..1b92117 100644 --- a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/ConfigUtils.java +++ b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/ConfigUtils.java @@ -25,6 +25,20 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.api.RdfCloudTripleStoreConfiguration; +import mvm.rya.indexing.FilterFunctionOptimizer; +import mvm.rya.indexing.accumulo.entity.EntityCentricIndex; +import mvm.rya.indexing.accumulo.entity.EntityOptimizer; +import mvm.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer; +import mvm.rya.indexing.accumulo.freetext.LuceneTokenizer; +import mvm.rya.indexing.accumulo.freetext.Tokenizer; +import mvm.rya.indexing.accumulo.geo.GeoMesaGeoIndexer; +import mvm.rya.indexing.accumulo.temporal.AccumuloTemporalIndexer; +import mvm.rya.indexing.mongodb.MongoFreeTextIndexer; +import mvm.rya.indexing.mongodb.MongoGeoIndexer; +import mvm.rya.indexing.pcj.matching.PCJOptimizer; + import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.BatchScanner; @@ -49,20 +63,6 @@ import org.openrdf.model.impl.URIImpl; import com.google.common.collect.Lists; -import mvm.rya.accumulo.AccumuloRdfConfiguration; -import mvm.rya.api.RdfCloudTripleStoreConfiguration; -import mvm.rya.indexing.FilterFunctionOptimizer; -import mvm.rya.indexing.accumulo.entity.EntityCentricIndex; -import mvm.rya.indexing.accumulo.entity.EntityOptimizer; -import mvm.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer; -import mvm.rya.indexing.accumulo.freetext.LuceneTokenizer; -import mvm.rya.indexing.accumulo.freetext.Tokenizer; -import mvm.rya.indexing.accumulo.geo.GeoMesaGeoIndexer; -import mvm.rya.indexing.accumulo.temporal.AccumuloTemporalIndexer; -import mvm.rya.indexing.external.PrecompJoinOptimizer; -import mvm.rya.indexing.mongodb.MongoFreeTextIndexer; -import mvm.rya.indexing.mongodb.MongoGeoIndexer; - /** * A set of configuration utils to read a Hadoop {@link Configuration} object and create Cloudbase/Accumulo objects. */ @@ -157,7 +157,7 @@ public class ConfigUtils { return false; } - private static String getIndexTableName(final Configuration conf, final String indexTableNameConf, final String altSuffix){ + private static String getIndexTableName(Configuration conf, String indexTableNameConf, String altSuffix){ String value = conf.get(indexTableNameConf); if (value == null){ final String defaultTableName = conf.get(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX); @@ -183,12 +183,12 @@ public class ConfigUtils { return getIndexTableName(conf, GEO_TABLENAME, "geo"); } - public static String getTemporalTableName(final Configuration conf) { + public static String getTemporalTableName(Configuration conf) { return getIndexTableName(conf, TEMPORAL_TABLENAME, "temporal"); } - public static String getEntityTableName(final Configuration conf) { + public static String getEntityTableName(Configuration conf) { return getIndexTableName(conf, ENTITY_TABLENAME, "entity"); } @@ -253,10 +253,11 @@ public class ConfigUtils { final Connector connector = ConfigUtils.getConnector(conf); final Authorizations auths = ConfigUtils.getAuthorizations(conf); Integer numThreads = null; - if (conf instanceof RdfCloudTripleStoreConfiguration) + if (conf instanceof RdfCloudTripleStoreConfiguration) { numThreads = ((RdfCloudTripleStoreConfiguration) conf).getNumThreads(); - else + } else { numThreads = conf.getInt(RdfCloudTripleStoreConfiguration.CONF_NUM_THREADS, 2); + } return connector.createBatchScanner(tablename, auths, numThreads); } @@ -341,39 +342,39 @@ public class ConfigUtils { return conf.getInt(GEO_NUM_PARTITIONS, getNumPartitions(conf)); } - public static boolean getUseGeo(final Configuration conf) { + public static boolean getUseGeo(Configuration conf) { return conf.getBoolean(USE_GEO, false); } - public static boolean getUseFreeText(final Configuration conf) { + public static boolean getUseFreeText(Configuration conf) { return conf.getBoolean(USE_FREETEXT, false); } - public static boolean getUseTemporal(final Configuration conf) { + public static boolean getUseTemporal(Configuration conf) { return conf.getBoolean(USE_TEMPORAL, false); } - public static boolean getUseEntity(final Configuration conf) { + public static boolean getUseEntity(Configuration conf) { return conf.getBoolean(USE_ENTITY, false); } - public static boolean getUsePCJ(final Configuration conf) { + public static boolean getUsePCJ(Configuration conf) { return conf.getBoolean(USE_PCJ, false); } - public static boolean getUseOptimalPCJ(final Configuration conf) { + public static boolean getUseOptimalPCJ(Configuration conf) { return conf.getBoolean(USE_OPTIMAL_PCJ, false); } - public static boolean getUseMongo(final Configuration conf) { + public static boolean getUseMongo(Configuration conf) { return conf.getBoolean(USE_MONGO, false); } - public static void setIndexers(final RdfCloudTripleStoreConfiguration conf) { + public static void setIndexers(RdfCloudTripleStoreConfiguration conf) { - final List<String> indexList = Lists.newArrayList(); - final List<String> optimizers = Lists.newArrayList(); + List<String> indexList = Lists.newArrayList(); + List<String> optimizers = Lists.newArrayList(); boolean useFilterIndex = false; @@ -389,7 +390,7 @@ public class ConfigUtils { } else { if (getUsePCJ(conf) || getUseOptimalPCJ(conf)) { - conf.setPcjOptimizer(PrecompJoinOptimizer.class); + conf.setPcjOptimizer(PCJOptimizer.class); } if (getUseGeo(conf)) { @@ -421,5 +422,9 @@ public class ConfigUtils { conf.setStrings(AccumuloRdfConfiguration.CONF_ADDITIONAL_INDEXERS, indexList.toArray(new String[]{})); conf.setStrings(AccumuloRdfConfiguration.CONF_OPTIMIZERS, optimizers.toArray(new String[]{})); + } -} \ No newline at end of file + + + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/96dd55ec/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/AccumuloDocIdIndexer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/AccumuloDocIdIndexer.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/AccumuloDocIdIndexer.java index feb894f..73378cf 100644 --- a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/AccumuloDocIdIndexer.java +++ b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/AccumuloDocIdIndexer.java @@ -8,9 +8,9 @@ package mvm.rya.indexing.accumulo.entity; * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -56,7 +56,6 @@ import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.openrdf.query.BindingSet; import org.openrdf.query.MalformedQueryException; @@ -75,23 +74,23 @@ import com.google.common.primitives.Bytes; public class AccumuloDocIdIndexer implements DocIdIndexer { - - + + private BatchScanner bs; private AccumuloRdfConfiguration conf; - + public AccumuloDocIdIndexer(RdfCloudTripleStoreConfiguration conf) throws AccumuloException, AccumuloSecurityException { Preconditions.checkArgument(conf instanceof RdfCloudTripleStoreConfiguration, "conf must be isntance of RdfCloudTripleStoreConfiguration"); this.conf = (AccumuloRdfConfiguration) conf; - //Connector conn = ConfigUtils.getConnector(conf); + //Connector conn = ConfigUtils.getConnector(conf); } - - - - + + + + public CloseableIteration<BindingSet, QueryEvaluationException> queryDocIndex(String sparqlQuery, Collection<BindingSet> constraints) throws TableNotFoundException, QueryEvaluationException { - + SPARQLParser parser = new SPARQLParser(); ParsedQuery pq1 = null; try { @@ -99,22 +98,22 @@ public class AccumuloDocIdIndexer implements DocIdIndexer { } catch (MalformedQueryException e) { e.printStackTrace(); } - + TupleExpr te1 = pq1.getTupleExpr(); List<StatementPattern> spList1 = StatementPatternCollector.process(te1); - + if(StarQuery.isValidStarQuery(spList1)) { StarQuery sq1 = new StarQuery(spList1); return queryDocIndex(sq1, constraints); } else { throw new IllegalArgumentException("Invalid star query!"); } - + } - - - - + + + + @Override public CloseableIteration<BindingSet, QueryEvaluationException> queryDocIndex(StarQuery query, Collection<BindingSet> constraints) throws TableNotFoundException, QueryEvaluationException { @@ -132,50 +131,50 @@ public class AccumuloDocIdIndexer implements DocIdIndexer { commonVarNames = Sets.newHashSet(); unCommonVarNames = Sets.newHashSet(); } - + if( commonVarNames.size() == 1 && !query.commonVarConstant() && commonVarNames.contains(query.getCommonVarName())) { - + final HashMultimap<String, BindingSet> map = HashMultimap.create(); final String commonVar = starQ.getCommonVarName(); final Iterator<Entry<Key, Value>> intersections; final BatchScanner scan; Set<Range> ranges = Sets.newHashSet(); - + while(bs.hasNext()) { - + BindingSet currentBs = bs.next(); - + if(currentBs.getBinding(commonVar) == null) { continue; } - + String row = currentBs.getBinding(commonVar).getValue().stringValue(); - ranges.add(new Range(row)); + ranges.add(new Range(row)); map.put(row, currentBs); - + } scan = runQuery(starQ, ranges); intersections = scan.iterator(); - - + + return new CloseableIteration<BindingSet, QueryEvaluationException>() { - + private QueryBindingSet currentSolutionBs = null; private boolean hasNextCalled = false; private boolean isEmpty = false; - private Iterator<BindingSet> inputSet = (new ArrayList<BindingSet>()).iterator(); + private Iterator<BindingSet> inputSet = new ArrayList<BindingSet>().iterator(); private BindingSet currentBs; private Key key; - - - + + + @Override public boolean hasNext() throws QueryEvaluationException { if (!hasNextCalled && !isEmpty) { while (inputSet.hasNext() || intersections.hasNext()) { if (!inputSet.hasNext()) { - key = intersections.next().getKey(); + key = intersections.next().getKey(); inputSet = map.get(key.getRow().toString()).iterator(); } currentBs = inputSet.next(); @@ -199,7 +198,7 @@ public class AccumuloDocIdIndexer implements DocIdIndexer { } - + @Override public BindingSet next() throws QueryEvaluationException { @@ -227,10 +226,10 @@ public class AccumuloDocIdIndexer implements DocIdIndexer { public void close() throws QueryEvaluationException { scan.close(); } - + }; - - + + } else { return new CloseableIteration<BindingSet, QueryEvaluationException>() { @@ -249,7 +248,7 @@ public class AccumuloDocIdIndexer implements DocIdIndexer { private StarQuery sq = new StarQuery(starQ); private Set<Range> emptyRangeSet = Sets.newHashSet(); private BatchScanner scan; - + @Override public BindingSet next() throws QueryEvaluationException { if (hasNextCalled) { @@ -268,7 +267,7 @@ public class AccumuloDocIdIndexer implements DocIdIndexer { @Override public boolean hasNext() throws QueryEvaluationException { - + if (!init) { if (intersections == null && bs.hasNext()) { currentBs = bs.next(); @@ -300,7 +299,7 @@ public class AccumuloDocIdIndexer implements DocIdIndexer { } else { continue; } - + if (sq.commonVarConstant() && currentSolutionBs.size() == unCommonVarNames.size() + sq.getUnCommonVars().size()) { hasNextCalled = true; return true; @@ -327,21 +326,21 @@ public class AccumuloDocIdIndexer implements DocIdIndexer { }; } } - + private QueryBindingSet deserializeKey(Key key, StarQuery sq, BindingSet currentBs, Set<String> unCommonVar) { - - + + QueryBindingSet currentSolutionBs = new QueryBindingSet(); - + Text row = key.getRow(); Text cq = key.getColumnQualifier(); - - + + String[] cqArray = cq.toString().split(DocIndexIteratorUtil.DOC_ID_INDEX_DELIM); boolean commonVarSet = false; - - //if common Var is constant there is no common variable to assign a value to + + //if common Var is constant there is no common variable to assign a value to if(sq.commonVarConstant()) { commonVarSet = true; } @@ -363,7 +362,7 @@ public class AccumuloDocIdIndexer implements DocIdIndexer { byte[] cqContent = Arrays.copyOfRange(cqBytes, secondIndex + 1, typeIndex); byte[] objType = Arrays.copyOfRange(cqBytes, typeIndex, cqBytes.length); - if ((new String(tripleComponent)).equals("object")) { + if (new String(tripleComponent).equals("object")) { byte[] object = Bytes.concat(cqContent, objType); org.openrdf.model.Value v = null; try { @@ -374,7 +373,7 @@ public class AccumuloDocIdIndexer implements DocIdIndexer { } currentSolutionBs.addBinding(s, v); - } else if ((new String(tripleComponent)).equals("subject")) { + } else if (new String(tripleComponent).equals("subject")) { if (!commonVarSet) { byte[] object = Bytes.concat(row.getBytes(), objType); org.openrdf.model.Value v = null; @@ -398,9 +397,9 @@ public class AccumuloDocIdIndexer implements DocIdIndexer { } return currentSolutionBs; } - + private BatchScanner runQuery(StarQuery query, Collection<Range> ranges) throws QueryEvaluationException { - + try { if (ranges.size() == 0) { String rangeText = query.getCommonVarValue(); @@ -411,13 +410,13 @@ public class AccumuloDocIdIndexer implements DocIdIndexer { r = new Range(); } ranges = Collections.singleton(r); - } - + } + Connector accCon = ConfigUtils.getConnector(conf); IteratorSetting is = new IteratorSetting(30, "fii", DocumentIndexIntersectingIterator.class); DocumentIndexIntersectingIterator.setColumnFamilies(is, query.getColumnCond()); - + if(query.hasContext()) { DocumentIndexIntersectingIterator.setContext(is, query.getContextURI()); } @@ -425,9 +424,9 @@ public class AccumuloDocIdIndexer implements DocIdIndexer { new Authorizations(conf.get(ConfigUtils.CLOUDBASE_AUTHS)), 15); bs.addScanIterator(is); bs.setRanges(ranges); - + return bs; - + } catch (TableNotFoundException e) { e.printStackTrace(); } catch (AccumuloException e) { @@ -437,7 +436,7 @@ public class AccumuloDocIdIndexer implements DocIdIndexer { } throw new QueryEvaluationException(); } - + @Override public void close() throws IOException {
