http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c12f58f4/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/AccumuloPcjSerializer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/AccumuloPcjSerializer.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/AccumuloPcjSerializer.java new file mode 100644 index 0000000..5aefc40 --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/AccumuloPcjSerializer.java @@ -0,0 +1,103 @@ +package mvm.rya.indexing.external.tupleSet; + +import static mvm.rya.api.RdfCloudTripleStoreConstants.DELIM_BYTE; +import static mvm.rya.api.RdfCloudTripleStoreConstants.DELIM_BYTES; +import static mvm.rya.api.RdfCloudTripleStoreConstants.TYPE_DELIM_BYTE; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import mvm.rya.api.domain.RyaType; +import mvm.rya.api.resolver.RdfToRyaConversions; +import mvm.rya.api.resolver.RyaContext; +import mvm.rya.api.resolver.RyaToRdfConversions; +import mvm.rya.api.resolver.RyaTypeResolverException; + +import org.openrdf.model.Value; +import org.openrdf.query.BindingSet; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; + +import com.google.common.base.Preconditions; +import com.google.common.primitives.Bytes; + +/** + * AccumuloPcjSerializer provides two methods, serialize and deserialize, which + * are used for writing BindingSets to PCJ tables and reading serialized byte + * representations of BindingSets from PCJ tables. + * + */ +public class AccumuloPcjSerializer { + + /** + * + * @param bs {@link BindingSet} to be serialized + * @param varOrder order in which binding values should be written to byte array + * @return byte array containing serialized values written in order indicated by varOrder + * @throws RyaTypeResolverException + */ + public static byte[] serialize(BindingSet bs, String[] varOrder) throws RyaTypeResolverException { + byte[] byteArray = null; + int i = 0; + Preconditions.checkNotNull(bs); + Preconditions.checkNotNull(varOrder); + Preconditions.checkArgument(bs.size() == varOrder.length); + for(final String varName: varOrder) { + final RyaType rt = RdfToRyaConversions.convertValue(bs.getBinding(varName).getValue()); + final byte[][] serializedVal = RyaContext.getInstance().serializeType(rt); + if(i == 0) { + byteArray = Bytes.concat(serializedVal[0], serializedVal[1], DELIM_BYTES); + } else { + byteArray = Bytes.concat(byteArray, serializedVal[0], serializedVal[1], DELIM_BYTES); + } + i++; + } + + return byteArray; + } + + /** + * + * @param row byte rowId (read from Accumulo {@link Key}) + * @param varOrder indicates the order in which values are written in row + * @return {@link BindingSet} formed from serialized values in row and variables in varOrder + * @throws RyaTypeResolverException + */ + public static BindingSet deSerialize(byte[] row, String[] varOrder) throws RyaTypeResolverException { + Preconditions.checkNotNull(row); + Preconditions.checkNotNull(varOrder); + final int lastIndex = Bytes.lastIndexOf(row, DELIM_BYTE); + Preconditions.checkArgument(lastIndex >= 0); + final List<byte[]> byteList = getByteValues(Arrays.copyOf(row, lastIndex), new ArrayList<byte[]>()); + final QueryBindingSet bs = new QueryBindingSet(); + Preconditions.checkArgument(byteList.size() == varOrder.length); + for(int i = 0; i < byteList.size(); i++) { + bs.addBinding(varOrder[i], getValue(byteList.get(i))); + } + return bs; + } + + private static List<byte[]> getByteValues(byte[] row, List<byte[]> byteList) { + final int firstIndex = Bytes.indexOf(row, DELIM_BYTE); + if(firstIndex < 0) { + byteList.add(row); + return byteList; + } else { + byteList.add(Arrays.copyOf(row, firstIndex)); + getByteValues(Arrays.copyOfRange(row, firstIndex+1, row.length), byteList); + } + + return byteList; + } + + private static Value getValue(byte[] byteVal) throws RyaTypeResolverException { + + final int typeIndex = Bytes.indexOf(byteVal, TYPE_DELIM_BYTE); + Preconditions.checkArgument(typeIndex >= 0); + final byte[] data = Arrays.copyOf(byteVal, typeIndex); + final byte[] type = Arrays.copyOfRange(byteVal, typeIndex, byteVal.length); + final RyaType rt = RyaContext.getInstance().deserialize(Bytes.concat(data,type)); + return RyaToRdfConversions.convertValue(rt); + + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c12f58f4/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/ExternalTupleSet.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/ExternalTupleSet.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/ExternalTupleSet.java index 0e2096d..ddf691d 100644 --- a/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/ExternalTupleSet.java +++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/ExternalTupleSet.java @@ -8,9 +8,9 @@ package mvm.rya.indexing.external.tupleSet; * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -23,37 +23,51 @@ package mvm.rya.indexing.external.tupleSet; import info.aduna.iteration.CloseableIteration; +import java.util.Collection; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import org.openrdf.query.BindingSet; import org.openrdf.query.QueryEvaluationException; import org.openrdf.query.algebra.Projection; -import org.openrdf.query.algebra.Var; +import org.openrdf.query.algebra.TupleExpr; import org.openrdf.query.algebra.evaluation.impl.ExternalSet; -import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; import com.beust.jcommander.internal.Sets; import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; /** - * Abstract class for an External Tuple Set. This Tuple + * This is an abstract class of delegating the evaluation of part + * of a SPARQL query to an external source. The {@link TupleExpr} returned by {@link ExternalTupleSet#getTupleExpr()} + * represents the SPARQL string that this node evaluates, and table returned by {@link ExternalTupleSet#getTableVarMap()} + * maps the variables of TupleExpr to the variables stored in the external store (which may be different). The map + * returned by {@link ExternalTupleSet#getSupportedVariableOrderMap()} provides a map of all the variable orders in which + * data is written to the supporting, and is useful for determining which {@link BindingSet} can be passed into + * {@link ExternalTupleSet#evaluate(BindingSet)}. + * */ public abstract class ExternalTupleSet extends ExternalSet { - private Projection tupleExpr; - private Map<String, String> tableVarMap = Maps.newHashMap(); - private Map<String, Set<String>> supportedVarOrders = Maps.newHashMap(); + public static final String VAR_ORDER_DELIM = ";"; + public static final String CONST_PREFIX = "-const-"; + private Projection tupleExpr; + private Map<String, String> tableVarMap = Maps.newHashMap(); //maps vars in tupleExpr to var in stored binding sets + private Map<String, Set<String>> supportedVarOrders = Maps.newHashMap(); //indicates supported var orders - public ExternalTupleSet() { - } - + public ExternalTupleSet(Projection tupleExpr) { + Preconditions.checkNotNull(tupleExpr); this.tupleExpr = tupleExpr; + updateTableVarMap(tupleExpr, tupleExpr); } @Override @@ -79,135 +93,178 @@ public abstract class ExternalTupleSet extends ExternalSet { } public void setProjectionExpr(Projection tupleExpr) { + Preconditions.checkNotNull(tupleExpr); + if(this.tupleExpr == null) { + updateTableVarMap(tupleExpr, tupleExpr); + } else { + updateTableVarMap(tupleExpr, this.tupleExpr); + } this.tupleExpr = tupleExpr; + if (supportedVarOrders.size() != 0) { + updateSupportedVarOrderMap(); + } } - - + public void setTableVarMap(Map<String,String> vars) { + Preconditions.checkNotNull(vars); this.tableVarMap = vars; } - - + public Map<String, String> getTableVarMap() { return this.tableVarMap; } - - + public void setSupportedVariableOrderMap(Map<String, Set<String>> varOrders) { + Preconditions.checkNotNull(varOrders); this.supportedVarOrders = varOrders; } - - + + public void setSupportedVariableOrderMap(List<String> varOrders) { + Preconditions.checkNotNull(varOrders); + this.supportedVarOrders = createSupportedVarOrderMap(varOrders); + } + public Map<String, Set<String>> getSupportedVariableOrderMap() { return supportedVarOrders; } - - - public void updateTupleExp(final Map<Var, Var> oldToNewBindings) { - tupleExpr.visit(new QueryModelVisitorBase<RuntimeException>() { - @Override - public void meet(Var var) { - if (oldToNewBindings.containsKey(var)) { - var.replaceWith(oldToNewBindings.get(var)); - } - } - }); - } @Override public ExternalSet clone() { - ExternalTupleSet clone = (ExternalTupleSet) super.clone(); + final ExternalTupleSet clone = (ExternalTupleSet) super.clone(); clone.tupleExpr = this.tupleExpr.clone(); clone.tableVarMap = Maps.newHashMap(); - for(String s: this.tableVarMap.keySet()) { + for(final String s: this.tableVarMap.keySet()) { clone.tableVarMap.put(s,this.tableVarMap.get(s)); } clone.supportedVarOrders = Maps.newHashMap(); - for(String s: this.supportedVarOrders.keySet()) { + for(final String s: this.supportedVarOrders.keySet()) { clone.supportedVarOrders.put(s,this.supportedVarOrders.get(s)); } return clone; } - - - public Map<String, Set<String>> getSupportedVariableOrders() { - - if (supportedVarOrders.size() != 0) { - return supportedVarOrders; - } else { - Set<String> varSet = Sets.newHashSet(); - String t = ""; + public Map<String, Set<String>> getSupportedVariableOrders() { + return supportedVarOrders; + } - for (String s : tupleExpr.getAssuredBindingNames()) { - if (t.length() == 0) { - t = s; - } else { - t = t + "\u0000" + s; - } + public boolean supportsBindingSet(Set<String> bindingNames) { + final Collection<Set<String>> values = supportedVarOrders.values(); + final Set<String> bNames = Sets.newHashSet(); + final Set<String> bNamesWithConstants = Sets.newHashSet(); - varSet.add(s); - supportedVarOrders.put(t, new HashSet<String>(varSet)); + for (final String s : this.getTupleExpr().getAssuredBindingNames()) { + if (bindingNames.contains(s)) { + bNames.add(s); + bNamesWithConstants.add(s); + } else if(s.startsWith(CONST_PREFIX)) { + bNamesWithConstants.add(s); + } + } + return values.contains(bNames) || values.contains(bNamesWithConstants); + } - } + /** + * @param tupleMatch + * - project expression - call after setting {@link tupleExpr} to + * map new variables to old -- the variables in the binding list + * of the new tupleExpr (tupleMatch) must map to the + * corresponding variables in the binding list of the old + * tupleExpr + */ + private void updateTableVarMap(TupleExpr newTuple, TupleExpr oldTuple) { - return supportedVarOrders; - } - } - - - - - public boolean supportsBindingSet(Set<String> bindingNames) { - - Map<String, Set<String>> varOrderMap = getSupportedVariableOrders(); - String bNames = ""; - - for (String s : tupleExpr.getAssuredBindingNames()) { - if (bindingNames.contains(s)) { - if(bNames.length() == 0) { - bNames = s; - } else { - bNames = bNames + "\u0000"+ s; - } - } - } + final List<String> replacementVars = Lists.newArrayList(newTuple + .getBindingNames()); + final List<String> tableVars = Lists.newArrayList(oldTuple + .getBindingNames()); + + final Map<String, String> tableMap = Maps.newHashMap(); + + for (int i = 0; i < tableVars.size(); i++) { + tableMap.put(replacementVars.get(i), tableVars.get(i)); + } + this.setTableVarMap(tableMap); + } + + /** + * call after setting {@link tableVarMap} to update map of supported + * variables in terms of variables in new tupleExpr + */ + private void updateSupportedVarOrderMap() { + + Preconditions.checkArgument(supportedVarOrders.size() != 0);; + final Map<String, Set<String>> newSupportedOrders = Maps.newHashMap(); + final BiMap<String, String> biMap = HashBiMap.create(tableVarMap) + .inverse(); + Set<String> temp = null; + final Set<String> keys = supportedVarOrders.keySet(); + + for (final String s : keys) { + temp = supportedVarOrders.get(s); + final Set<String> newSet = Sets.newHashSet(); + + for (final String t : temp) { + newSet.add(biMap.get(t)); + } + + final String[] tempStrings = s.split(VAR_ORDER_DELIM); + String v = ""; + for (final String u : tempStrings) { + if (v.length() == 0) { + v = v + biMap.get(u); + } else { + v = v + VAR_ORDER_DELIM + biMap.get(u); + } + } + newSupportedOrders.put(v, newSet); + } + supportedVarOrders = newSupportedOrders; + } + + /** + * + * @param orders + * @return - map with all possible orders in which results are written to the table + */ + private Map<String, Set<String>> createSupportedVarOrderMap(List<String> orders) { + final Map<String, Set<String>> supportedVars = Maps.newHashMap(); + + for (final String t : orders) { + final String[] tempOrder = t.split(VAR_ORDER_DELIM); + final Set<String> varSet = Sets.newHashSet(); + String u = ""; + for (final String s : tempOrder) { + if(u.length() == 0) { + u = s; + } else{ + u = u+ VAR_ORDER_DELIM + s; + } + varSet.add(s); + supportedVars.put(u, new HashSet<String>(varSet)); + } + } + return supportedVars; + } - return varOrderMap.containsKey(bNames); - } - - - @Override public boolean equals(Object other) { - if (!(other instanceof ExternalTupleSet)) { return false; } else { - - ExternalTupleSet arg = (ExternalTupleSet) other; + final ExternalTupleSet arg = (ExternalTupleSet) other; if (this.getTupleExpr().equals(arg.getTupleExpr())) { return true; } else { return false; } - } - } - - + @Override public int hashCode() { int result = 17; result = 31*result + tupleExpr.hashCode(); - return result; } - - - - - - + } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c12f58f4/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/PcjTables.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/PcjTables.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/PcjTables.java new file mode 100644 index 0000000..e422cba --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/PcjTables.java @@ -0,0 +1,800 @@ +package mvm.rya.indexing.external.tupleSet; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Set; + +import javax.annotation.ParametersAreNonnullByDefault; +import javax.annotation.concurrent.Immutable; + +import mvm.rya.api.resolver.RyaTypeResolverException; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.ConditionalWriter; +import org.apache.accumulo.core.client.ConditionalWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.client.lexicoder.ListLexicoder; +import org.apache.accumulo.core.client.lexicoder.LongLexicoder; +import org.apache.accumulo.core.client.lexicoder.StringLexicoder; +import org.apache.accumulo.core.data.Condition; +import org.apache.accumulo.core.data.ConditionalMutation; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; +import org.openrdf.query.BindingSet; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.QueryLanguage; +import org.openrdf.query.TupleQuery; +import org.openrdf.query.TupleQueryResult; +import org.openrdf.repository.RepositoryConnection; +import org.openrdf.repository.RepositoryException; + +import com.google.common.base.Joiner; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; + +/** + * Functions that create and maintain the PCJ tables that are used by Rya. + */ +@ParametersAreNonnullByDefault +public class PcjTables { + private static final Logger log = Logger.getLogger(PcjTables.class); + + /** + * The Row ID of all {@link PcjMetadata} entries that are stored in Accumulo. + */ + private static final Text PCJ_METADATA_ROW_ID = new Text("pcjMetadata"); + + /** + * The Column Family for all PCJ metadata entries. + */ + private static final Text PCJ_METADATA_FAMILY = new Text("metadata"); + + /** + * The Column Qualifier for the SPARQL query a PCJ is built from. + */ + private static final Text PCJ_METADATA_SPARQL_QUERY = new Text("sparql"); + + /** + * The Column Qualifier for the cardinality of a PCJ. + */ + private static final Text PCJ_METADATA_CARDINALITY = new Text("cardinality"); + + /** + * The Column Qualifier for the various variable orders a PCJ's results are written to. + */ + private static final Text PCJ_METADATA_VARIABLE_ORDERS = new Text("variableOrders"); + + // Lexicoders used to read/write PcjMetadata to/from Accumulo. + private static final LongLexicoder longLexicoder = new LongLexicoder(); + private static final StringLexicoder stringLexicoder = new StringLexicoder(); + private static final ListLexicoder<String> listLexicoder = new ListLexicoder<>(stringLexicoder); + + /** + * An ordered list of {@link BindingSet} variable names. These are used to + * specify the order {@link Binding}s within the set are serialized to Accumulo. + * This order effects which rows a prefix scan will hit. + */ + @Immutable + @ParametersAreNonnullByDefault + public static final class VariableOrder implements Iterable<String> { + + public static final String VAR_ORDER_DELIM = ";"; + + private final ImmutableList<String> variableOrder; + + /** + * Constructs an instance of {@link VariableOrder}. + * + * @param varOrder - An ordered array of Binding Set variables. (not null) + */ + public VariableOrder(String... varOrder) { + checkNotNull(varOrder); + this.variableOrder = ImmutableList.copyOf(varOrder); + } + + /** + * Constructs an instance of {@link VariableOrder}. + * + * @param varOrderString - The String representation of a VariableOrder. (not null) + */ + public VariableOrder(String varOrderString) { + checkNotNull(varOrderString); + this.variableOrder = ImmutableList.copyOf( varOrderString.split(VAR_ORDER_DELIM) ); + } + + /** + * @return And ordered list of Binding Set variables. + */ + public ImmutableList<String> getVariableOrders() { + return variableOrder; + } + + /** + * @return The variable order as an ordered array of Strings. This array is mutable. + */ + public String[] toArray() { + String[] array = new String[ variableOrder.size() ]; + return variableOrder.toArray( array ); + } + + @Override + public String toString() { + return Joiner.on(VAR_ORDER_DELIM).join(variableOrder); + } + + @Override + public int hashCode() { + return variableOrder.hashCode(); + } + + @Override + public boolean equals(Object o) { + if(this == o) { + return true; + } else if(o instanceof VariableOrder) { + VariableOrder varOrder = (VariableOrder) o; + return variableOrder.equals( varOrder.variableOrder ); + } + return false; + } + + @Override + public Iterator<String> iterator() { + return variableOrder.iterator(); + } + } + + /** + * Metadata that is stored in a PCJ table about the results that are stored within it. + */ + @Immutable + @ParametersAreNonnullByDefault + public static final class PcjMetadata { + private final String sparql; + private final long cardinality; + private final ImmutableSet<VariableOrder> varOrders; + + /** + * Constructs an instance of {@link PcjMetadata}. + * + * @param sparql - The SPARQL query this PCJ solves. (not null) + * @param cardinality - The number of results the PCJ has. (>= 0) + * @param varOrders - Strings that describe each of the variable orders + * the results are stored in. (not null) + */ + public PcjMetadata(final String sparql, final long cardinality, final Collection<VariableOrder> varOrders) { + this.sparql = checkNotNull(sparql); + this.varOrders = ImmutableSet.copyOf( checkNotNull(varOrders) ); + + checkArgument(cardinality >= 0, "Cardinality of a PCJ must be >= 0. Was: " + cardinality); + this.cardinality = cardinality; + } + + /** + * @return The SPARQL query this PCJ solves. + */ + public String getSparql() { + return sparql; + } + + /** + * @return The number of results the PCJ has. + */ + public long getCardinality() { + return cardinality; + } + + /** + * @return Strings that describe each of the variable orders the results are stored in. + */ + public ImmutableSet<VariableOrder> getVarOrders() { + return varOrders; + } + + /** + * Updates the cardinality of a {@link PcjMetadata} by a {@code delta}. + * + * @param metadata - The PCJ metadata to update. (not null) + * @param delta - How much the cardinality of the PCJ has changed. + * @return A new instance of {@link PcjMetadata} with the new cardinality. + */ + public static PcjMetadata updateCardinality(final PcjMetadata metadata, final int delta) { + checkNotNull(metadata); + return new PcjMetadata(metadata.sparql, metadata.cardinality + delta, metadata.varOrders); + } + + @Override + public int hashCode() { + return Objects.hash(sparql, cardinality, varOrders); + } + + @Override + public boolean equals(final Object o) { + if(this == o) { + return true; + } else if(o instanceof PcjMetadata) { + final PcjMetadata metadata = (PcjMetadata) o; + return new EqualsBuilder() + .append(sparql, metadata.sparql) + .append(cardinality, metadata.cardinality) + .append(varOrders, metadata.varOrders) + .build(); + } + return false; + } + } + + /** + * Creates Accumulo table names that may be recognized by Rya as a table that + * holds the results of a Precomputed Join. + */ + public static class PcjTableNameFactory { + + /** + * Creates an Accumulo table names that may be recognized by Rya as a table + * that holds the results of a Precomputed Join. + * </p> + * An Accumulo cluster may host more than one Rya instance. To ensure each + * Rya instance's RDF Triples are segregated from each other, they are stored + * within different Accumulo tables. This is accomplished by prepending a + * {@code tablePrefix} to every table that is owned by a Rya instance. Each + * PCJ table is owned by a specific Rya instance, so it too must be prepended + * with the instance's {@code tablePrefix}. + * </p> + * When Rya scans for PCJ tables that it may use when creating execution plans, + * it looks for any table in Accumulo that has a name starting with its + * {@code tablePrefix} immediately followed by "INDEX". Anything following + * that portion of the table name is just a unique identifier for the SPARQL + * query that is being precomputed. Here's an example of what a table name + * may look like: + * <pre> + * demo_INDEX_QUERY:c8f5367c-1660-4210-a7cb-681ed004d2d9 + * </pre> + * The "demo_INDEX" portion indicates this table is a PCJ table for the "demo_" + * instance of Rya. The "_QUERY:c8f5367c-1660-4210-a7cb-681ed004d2d9" portion + * could be anything at all that uniquely identifies the query that is being updated. + * + * @param tablePrefix - The Rya instance's table prefix. (not null) + * @param uniqueId - The unique portion of the Rya PCJ table name. (not null) + * @return A Rya PCJ table name build using the provided values. + */ + public String makeTableName(final String tablePrefix, final String uniqueId) { + return tablePrefix + "INDEX_" + uniqueId; + } + } + + /** + * Create alternative variable orders for a SPARQL query based on + * the original ordering of its results. + */ + public static interface PcjVarOrderFactory { + + /** + * Create alternative variable orders for a SPARQL query based on + * the original ordering of its results. + * + * @param varOrder - The initial variable order of a SPARQL query. (not null) + * @return A set of alternative variable orders for the original. + */ + public Set<VariableOrder> makeVarOrders(VariableOrder varOrder); + } + + /** + * Shifts the variables to the left so that each variable will appear at + * the head of the varOrder once. + */ + @ParametersAreNonnullByDefault + public static class ShiftVarOrderFactory implements PcjVarOrderFactory { + @Override + public Set<VariableOrder> makeVarOrders(final VariableOrder varOrder) { + Set<VariableOrder> varOrders = new HashSet<>(); + + final List<String> cyclicBuff = Lists.newArrayList( varOrder.getVariableOrders() ); + final String[] varOrderBuff = new String[ cyclicBuff.size() ]; + + for(int shift = 0; shift < cyclicBuff.size(); shift++) { + // Build a variable order. + for(int i = 0; i < cyclicBuff.size(); i++) { + varOrderBuff[i] = cyclicBuff.get(i); + } + varOrders.add( new VariableOrder(varOrderBuff) ); + + // Shift the order the variables will appear in the cyclic buffer. + cyclicBuff.add( cyclicBuff.remove(0) ); + } + + return varOrders; + } + } + + /** + * Indicates one of the {@link PcjTables} functions has failed to perform its task. + */ + public static class PcjException extends Exception { + private static final long serialVersionUID = 1L; + + /** + * Constructs an instance of {@link PcjException}. + * + * @param message - Describes why the exception is being thrown. + */ + public PcjException(String message) { + super(message); + } + + /** + * Constructs an instance of {@link PcjException}. + * + * @param message - Describes why the exception is being thrown. + * @param cause - The exception that caused this one to be thrown. + */ + public PcjException(String message, Throwable cause) { + super(message, cause); + } + } + + /** + * Create a new PCJ table within an Accumulo instance for a SPARQL query. + * For example, calling the function like this: + * <pre> + * PcjTables.createPcjTable( + * accumuloConn, + * + * "foo_INDEX_query1234", + * + * Sets.newHashSet( + * new VariableOrder("city;worker;customer"), + * new VariableOrder("worker;customer;city") , + * new VariableOrder("customer;city;worker")), + * + * "SELECT ?customer ?worker ?city { " + + * "?customer <http://talksTo> ?worker. " + + * "?worker <http://livesIn> ?city. " + + * "?worker <http://worksAt> <http://Home>. " + + * "}"); + * </pre> + * </p> + * Will result in an Accumulo table named "foo_INDEX_query1234" with the following entries: + * <table border="1" style="width:100%"> + * <tr> <th>Row ID</td> <th>Column</td> <th>Value</td> </tr> + * <tr> <td>pcjMetadata</td> <td>metadata:sparql</td> <td> ... UTF-8 bytes encoding the query string ... </td> </tr> + * <tr> <td>pcjMetadata</td> <td>metadata:cardinality</td> <td> The query's cardinality </td> </tr> + * <tr> <td>pcjMetadata</td> <td>metadata:variableOrders</td> <td> The variable orders the results are written to </td> </tr> + * </table> + * + * @param accumuloConn - A connection to the Accumulo that hosts the PCJ table. (not null) + * @param pcjTableName - The name of the table that will be created. (not null) + * @param varOrders - The variable orders the results within the table will be written to. (not null) + * @param sparql - The query this table's results solves. (not null) + * @throws PcjException Could not create a new PCJ table either because Accumulo + * would not let us create it or the PCJ metadata was not able to be written to it. + */ + public void createPcjTable( + final Connector accumuloConn, + final String pcjTableName, + final Set<VariableOrder> varOrders, + final String sparql) throws PcjException { + checkNotNull(accumuloConn); + checkNotNull(pcjTableName); + checkNotNull(varOrders); + checkNotNull(sparql); + + final TableOperations tableOps = accumuloConn.tableOperations(); + if(!tableOps.exists(pcjTableName)) { + try { + // Create the new table in Accumulo. + tableOps.create(pcjTableName); + + // Write the PCJ Metadata to the newly created table. + final PcjMetadata pcjMetadata = new PcjMetadata(sparql, 0L, varOrders); + final List<Mutation> mutations = makeWriteMetadataMutations(pcjMetadata); + + final BatchWriter writer = accumuloConn.createBatchWriter(pcjTableName, new BatchWriterConfig()); + writer.addMutations(mutations); + writer.close(); + } catch (final TableExistsException e) { + log.warn("Something else just created the Rya PCJ export table named '" + pcjTableName + + "'. This is unexpected, but we will continue as normal."); + } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) { + throw new PcjException("Could not create a new PCJ named: " + pcjTableName, e); + } + } + } + + /** + * Create the {@link Mutation}s required to write a {@link PCJMetadata} object + * to an Accumulo table. + * + * @param metadata - The metadata to write. (not null) + * @return An ordered list of mutations that write the metadata to an Accumulo table. + */ + private static List<Mutation> makeWriteMetadataMutations(final PcjMetadata metadata) { + checkNotNull(metadata); + + final List<Mutation> mutations = new LinkedList<>(); + + // SPARQL Query + Mutation mutation = new Mutation(PCJ_METADATA_ROW_ID); + Value query = new Value( stringLexicoder.encode(metadata.getSparql()) ); + mutation.put(PCJ_METADATA_FAMILY, PCJ_METADATA_SPARQL_QUERY, query); + mutations.add(mutation); + + // Cardinality + mutation = new Mutation(PCJ_METADATA_ROW_ID); + Value cardinality = new Value( longLexicoder.encode(new Long(metadata.getCardinality())) ); + mutation.put(PCJ_METADATA_FAMILY, PCJ_METADATA_CARDINALITY, cardinality); + mutations.add(mutation); + + // Variable Orders + List<String> varOrderStrings = new ArrayList<>(); + for(VariableOrder varOrder : metadata.getVarOrders()) { + varOrderStrings.add( varOrder.toString() ); + } + + mutation = new Mutation(PCJ_METADATA_ROW_ID); + Value variableOrders = new Value( listLexicoder.encode(varOrderStrings) ); + mutation.put(PCJ_METADATA_FAMILY, PCJ_METADATA_VARIABLE_ORDERS, variableOrders); + mutations.add(mutation); + + return mutations; + } + + /** + * Fetch the {@link PCJMetadata} from an Accumulo table. + * <p> + * This method assumes the PCJ table has already been created. + * + * @param accumuloConn - A connection to the Accumulo that hosts the PCJ table. (not null) + * @param pcjTableName - The name of the table that will be search. (not null) + * @return The PCJ Metadata that has been stolred in the in the PCJ Table. + * @throws PcjException The PCJ Table does not exist. + */ + public PcjMetadata getPcjMetadata( + final Connector accumuloConn, + final String pcjTableName) throws PcjException { + checkNotNull(accumuloConn); + checkNotNull(pcjTableName); + + try { + // Create an Accumulo scanner that iterates through the metadata entries. + Scanner scanner = accumuloConn.createScanner(pcjTableName, new Authorizations()); + final Iterator<Entry<Key, Value>> entries = scanner.iterator(); + + // No metadata has been stored in the table yet. + if(!entries.hasNext()) { + throw new PcjException("Could not find any PCJ metadata in the table named: " + pcjTableName); + } + + // Fetch the metadata from the entries. Assuming they all have the same cardinality and sparql query. + String sparql = null; + Long cardinality = null; + final Set<VariableOrder> varOrders = new HashSet<>(); + + while(entries.hasNext()) { + final Entry<Key, Value> entry = entries.next(); + Text columnQualifier = entry.getKey().getColumnQualifier(); + byte[] value = entry.getValue().get(); + + if(columnQualifier.equals(PCJ_METADATA_SPARQL_QUERY)) { + sparql = stringLexicoder.decode(value); + } else if(columnQualifier.equals(PCJ_METADATA_CARDINALITY)) { + cardinality = longLexicoder.decode(value); + } else if(columnQualifier.equals(PCJ_METADATA_VARIABLE_ORDERS)) { + for(String varOrderStr : listLexicoder.decode(value)) { + varOrders.add( new VariableOrder(varOrderStr) ); + } + } + } + + return new PcjMetadata(sparql, cardinality, varOrders); + + } catch (TableNotFoundException e) { + throw new PcjException("Could not add results to a PCJ because the PCJ table does not exist.", e); + } + } + + /** + * Add a collection of results to a PCJ table. The table's cardinality will + * be updated to include the new results. + * <p> + * This method assumes the PCJ table has already been created. + * + * @param accumuloConn - A connection to the Accumulo that hosts the PCJ table. (not null) + * @param pcjTableName - The name of the PCJ table that will receive the results. (not null) + * @param results - Binding sets that will be written to the PCJ table. (not null) + * @throws PcjException The provided PCJ table doesn't exist, is missing the + * PCJ metadata, or the result could not be written to it. + */ + public void addResults( + final Connector accumuloConn, + final String pcjTableName, + final Collection<BindingSet> results) throws PcjException { + checkNotNull(accumuloConn); + checkNotNull(pcjTableName); + checkNotNull(results); + + // Write a result to each of the variable orders that are in the table. + writeResults(accumuloConn, pcjTableName, results); + + // Increment the cardinality of the query by the number of new results. + updateCardinality(accumuloConn, pcjTableName, results.size()); + } + + /** + * Add a collection of results to a specific PCJ table. + * + * @param accumuloConn - A connection to the Accumulo that hosts the PCJ table. (not null) + * @param pcjTableName - The name of the PCJ table that will receive the results. (not null) + * @param results - Binding sets that will be written to the PCJ table. (not null) + * @throws PcjException The provided PCJ table doesn't exist, is missing the + * PCJ metadata, or the result could not be written to it. + */ + private void writeResults( + final Connector accumuloConn, + final String pcjTableName, + final Collection<BindingSet> results) throws PcjException { + checkNotNull(accumuloConn); + checkNotNull(pcjTableName); + checkNotNull(results); + + // Fetch the variable orders from the PCJ table. + PcjMetadata metadata = getPcjMetadata(accumuloConn, pcjTableName); + + // Write each result formatted using each of the variable orders. + BatchWriter writer = null; + try { + writer = accumuloConn.createBatchWriter(pcjTableName, new BatchWriterConfig()); + for(BindingSet result : results) { + Set<Mutation> addResultMutations = makeWriteResultMutations(metadata.getVarOrders(), result); + writer.addMutations( addResultMutations ); + } + } catch (TableNotFoundException | MutationsRejectedException e) { + throw new PcjException("Could not add results to the PCJ table named: " + pcjTableName, e); + } finally { + if(writer != null) { + try { + writer.close(); + } catch (MutationsRejectedException e) { + throw new PcjException("Could not add results to a PCJ table because some of the mutations were rejected.", e); + } + } + } + } + + /** + * Create the {@link Mutations} required to write a new {@link BindingSet} + * to a PCJ table for each {@link VariableOrder} that is provided. + * + * @param varOrders - The variables orders the result will be written to. (not null) + * @param result - A new PCJ result. (not null) + * @return Mutation that will write the result to a PCJ table. + * @throws PcjException The binding set could not be encoded. + */ + private static Set<Mutation> makeWriteResultMutations( + final Set<VariableOrder> varOrders, + final BindingSet result) throws PcjException { + checkNotNull(varOrders); + checkNotNull(result); + + Set<Mutation> mutations = new HashSet<>(); + + for(final VariableOrder varOrder : varOrders) { + try { + // Serialize the result to the variable order. + byte[] serializedResult = AccumuloPcjSerializer.serialize(result, varOrder.toArray()); + + // Row ID = binding set values, Column Family = variable order of the binding set. + Mutation addResult = new Mutation(serializedResult); + addResult.put(varOrder.toString(), "", ""); + mutations.add(addResult); + } catch(RyaTypeResolverException e) { + throw new PcjException("Could not serialize a result.", e); + } + } + + return mutations; + } + + /** + * Update the cardinality of a PCJ by a {@code delta}. + * + * @param accumuloConn - A connection to the Accumulo that hosts the PCJ table. (not null) + * @param pcjTableName - The name of the PCJ table that will have its cardinality updated. (not null) + * @param delta - How much the cardinality will change. + * @throws PcjException The cardinality could not be updated. + */ + private void updateCardinality( + final Connector accumuloConn, + final String pcjTableName, + final long delta) throws PcjException { + checkNotNull(accumuloConn); + checkNotNull(pcjTableName); + + ConditionalWriter conditionalWriter = null; + try { + conditionalWriter = accumuloConn.createConditionalWriter(pcjTableName, new ConditionalWriterConfig()); + + boolean updated = false; + while(!updated) { + // Write the conditional update request to Accumulo. + long cardinality = getPcjMetadata(accumuloConn, pcjTableName).getCardinality(); + ConditionalMutation mutation = makeUpdateCardinalityMutation(cardinality, delta); + ConditionalWriter.Result result = conditionalWriter.write(mutation); + + // Interpret the result. + switch(result.getStatus()) { + case ACCEPTED: + updated = true; + break; + case REJECTED: + break; + case UNKNOWN: + // We do not know if the mutation succeeded. At best, we can hope the metadata hasn't been updated + // since we originally fetched it and try again. Otherwise, continue forwards as if it worked. It's + // okay if this number is slightly off. + long newCardinality = getPcjMetadata(accumuloConn, pcjTableName).getCardinality(); + if(newCardinality != cardinality) { + updated = true; + } + break; + case VIOLATED: + throw new PcjException("The cardinality could not be updated because the commit violated a table constraint."); + case INVISIBLE_VISIBILITY: + throw new PcjException("The condition contains a visibility the updater can not satisfy."); + } + } + } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) { + throw new PcjException("Could not update the cardinality value of the PCJ Table named: " + pcjTableName, e); + } finally { + if(conditionalWriter != null) { + conditionalWriter.close(); + } + } + } + + /** + * Creates a {@link ConditionalMutation} that only updates the cardinality + * of the PCJ table if the old value has not changed by the time this mutation + * is committed to Accumulo. + * + * @param current - The current cardinality value. + * @param delta - How much the cardinality will change. + * @return The mutation that will perform the conditional update. + */ + private static ConditionalMutation makeUpdateCardinalityMutation(long current, long delta) { + // Try to update the cardinality by the delta. + ConditionalMutation mutation = new ConditionalMutation(PCJ_METADATA_ROW_ID); + Condition lastCardinalityStillCurrent = new Condition( + PCJ_METADATA_FAMILY, + PCJ_METADATA_CARDINALITY); + + // Require the old cardinality to be the value we just read. + byte[] currentCardinalityBytes = longLexicoder.encode( current ); + lastCardinalityStillCurrent.setValue( currentCardinalityBytes ); + mutation.addCondition(lastCardinalityStillCurrent); + + // If that is the case, then update to the new value. + Value newCardinality = new Value( longLexicoder.encode(current + delta) ); + mutation.put(PCJ_METADATA_FAMILY, PCJ_METADATA_CARDINALITY, newCardinality); + return mutation; + } + + /** + * Scan Rya for results that solve the PCJ's query and store them in the PCJ table. + * <p> + * This method assumes the PCJ table has already been created. + * + * @param accumuloConn - A connection to the Accumulo that hosts the PCJ table. (not null) + * @param pcjTableName - The name of the PCJ table that will receive the results. (not null) + * @param ryaConn - A connection to the Rya store that will be queried to find results. (not null) + * @throws PcjException If results could not be written to the PCJ table, + * the PCJ table does not exist, or the query that is being execute + * was malformed. + */ + public void populatePcj( + final Connector accumuloConn, + final String pcjTableName, + final RepositoryConnection ryaConn) throws PcjException { + checkNotNull(accumuloConn); + checkNotNull(pcjTableName); + checkNotNull(ryaConn); + + try { + // Fetch the query that needs to be executed from the PCJ table. + PcjMetadata pcjMetadata = getPcjMetadata(accumuloConn, pcjTableName); + String sparql = pcjMetadata.getSparql(); + + // Query Rya for results to the SPARQL query. + TupleQuery query = ryaConn.prepareTupleQuery(QueryLanguage.SPARQL, sparql); + TupleQueryResult results = query.evaluate(); + + // Load batches of 1000 of them at a time into the PCJ table + Set<BindingSet> batch = new HashSet<>(1000); + while(results.hasNext()) { + batch.add( results.next() ); + + if(batch.size() == 1000) { + addResults(accumuloConn, pcjTableName, batch); + batch.clear(); + } + } + + if(!batch.isEmpty()) { + addResults(accumuloConn, pcjTableName, batch); + } + + } catch (RepositoryException | MalformedQueryException | QueryEvaluationException e) { + throw new PcjException("Could not populate a PCJ table with Rya results for the table named: " + pcjTableName, e); + } + } + + private static final PcjVarOrderFactory DEFAULT_VAR_ORDER_FACTORY = new ShiftVarOrderFactory(); + + /** + * Creates a new PCJ Table in Accumulo and populates it by scanning an + * instance of Rya for historic matches. + * <p> + * If any portion of this operation fails along the way, the partially + * create PCJ table will be left in Accumulo. + * + * @param ryaConn - Connects to the Rya that will be scanned. (not null) + * @param accumuloConn - Connects to the accumulo that hosts the PCJ results. (not null) + * @param pcjTableName - The name of the PCJ table that will be created. (not null) + * @param sparql - The SPARQL query whose results will be loaded into the table. (not null) + * @param resultVariables - The variables that are included in the query's resulting binding sets. (not null) + * @param pcjVarOrderFactory - An optional factory that indicates the various variable orders + * the results will be stored in. If one is not provided, then {@link ShiftVarOrderFactory} + * is used by default. (not null) + * @throws PcjException The PCJ table could not be create or the values from + * Rya were not able to be loaded into it. + */ + public void createAndPopulatePcj( + final RepositoryConnection ryaConn, + final Connector accumuloConn, + final String pcjTableName, + final String sparql, + final String[] resultVariables, + final Optional<PcjVarOrderFactory> pcjVarOrderFactory) throws PcjException { + checkNotNull(ryaConn); + checkNotNull(accumuloConn); + checkNotNull(pcjTableName); + checkNotNull(sparql); + checkNotNull(resultVariables); + checkNotNull(pcjVarOrderFactory); + + // Create the PCJ's variable orders. + PcjVarOrderFactory varOrderFactory = pcjVarOrderFactory.or(DEFAULT_VAR_ORDER_FACTORY); + Set<VariableOrder> varOrders = varOrderFactory.makeVarOrders( new VariableOrder(resultVariables) ); + + // Create the PCJ table in Accumulo. + createPcjTable(accumuloConn, pcjTableName, varOrders, sparql); + + // Load historic matches from Rya into the PCJ table. + populatePcj(accumuloConn, pcjTableName, ryaConn); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c12f58f4/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/SimpleExternalTupleSet.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/SimpleExternalTupleSet.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/SimpleExternalTupleSet.java index 44925ca..ca97014 100644 --- a/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/SimpleExternalTupleSet.java +++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/SimpleExternalTupleSet.java @@ -8,9 +8,9 @@ package mvm.rya.indexing.external.tupleSet; * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -19,9 +19,14 @@ package mvm.rya.indexing.external.tupleSet; * under the License. */ +import info.aduna.iteration.CloseableIteration; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; -import info.aduna.iteration.CloseableIteration; +import mvm.rya.indexing.external.PrecompJoinOptimizer; import org.openrdf.query.BindingSet; import org.openrdf.query.QueryEvaluationException; @@ -29,60 +34,79 @@ import org.openrdf.query.algebra.Projection; import org.openrdf.query.algebra.QueryModelVisitor; import com.google.common.base.Joiner; +import com.google.common.collect.Sets; +/** + * This a testing class to create mock pre-computed join nodes in order to + * test the {@link PrecompJoinOptimizer} for query planning. + * + */ +public class SimpleExternalTupleSet extends ExternalTupleSet { + public SimpleExternalTupleSet(Projection tuple) { + this.setProjectionExpr(tuple); + setSupportedVarOrders(); + } + private void setSupportedVarOrders() { + final Set<String> varSet = Sets.newHashSet(); + final Map<String, Set<String>> supportedVarOrders = new HashMap<>(); + String t = ""; -public class SimpleExternalTupleSet extends ExternalTupleSet { + for (final String s : this.getTupleExpr().getAssuredBindingNames()) { + if (t.length() == 0) { + t = s; + } else { + t = t + VAR_ORDER_DELIM + s; + } - - - public SimpleExternalTupleSet(Projection tuple) { - super(); - this.setProjectionExpr(tuple); - + varSet.add(s); + supportedVarOrders.put(t, new HashSet<String>(varSet)); + + } + this.setSupportedVariableOrderMap(supportedVarOrders); } - + @Override public <X extends Exception> void visit(QueryModelVisitor<X> visitor) - throws X - { - visitor.meetOther(this); + throws X { + visitor.meetOther(this); + } + + @Override + public CloseableIteration<BindingSet, QueryEvaluationException> evaluate( + BindingSet bindings) throws QueryEvaluationException { + // TODO Auto-generated method stub + return null; + } + + @Override + public String getSignature() { + return "(SimpleExternalTupleSet) " + + Joiner.on(", ") + .join(this.getTupleExpr().getProjectionElemList() + .getElements()).replaceAll("\\s+", " "); + + } + + @Override + public boolean equals(Object other) { + + if (!(other instanceof SimpleExternalTupleSet)) { + return false; + } else { + + final SimpleExternalTupleSet arg = (SimpleExternalTupleSet) other; + if (this.getTupleExpr().equals(arg.getTupleExpr())) { + return true; + } else { + return false; + } + } - - @Override - public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(BindingSet bindings) - throws QueryEvaluationException { - // TODO Auto-generated method stub - return null; - } - - @Override - public String getSignature() { - return "(SimpleExternalTupleSet) " - + Joiner.on(", ").join(this.getTupleExpr().getProjectionElemList().getElements()).replaceAll("\\s+", " "); - - } - - @Override - public boolean equals(Object other) { - - if (!(other instanceof SimpleExternalTupleSet)) { - return false; - } else { - - SimpleExternalTupleSet arg = (SimpleExternalTupleSet) other; - if (this.getTupleExpr().equals(arg.getTupleExpr())) { - return true; - } else { - return false; - } - - } - - } - + + } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c12f58f4/extras/indexing/src/test/java/mvm/rya/indexing/IndexPlanValidator/GeneralizedExternalProcessorTest.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/test/java/mvm/rya/indexing/IndexPlanValidator/GeneralizedExternalProcessorTest.java b/extras/indexing/src/test/java/mvm/rya/indexing/IndexPlanValidator/GeneralizedExternalProcessorTest.java index bfea0bd..deeaa63 100644 --- a/extras/indexing/src/test/java/mvm/rya/indexing/IndexPlanValidator/GeneralizedExternalProcessorTest.java +++ b/extras/indexing/src/test/java/mvm/rya/indexing/IndexPlanValidator/GeneralizedExternalProcessorTest.java @@ -8,9 +8,9 @@ package mvm.rya.indexing.IndexPlanValidator; * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -19,18 +19,14 @@ package mvm.rya.indexing.IndexPlanValidator; * under the License. */ - -import static org.junit.Assert.*; - import java.util.ArrayList; import java.util.List; import java.util.Set; -import junit.framework.Assert; -import mvm.rya.indexing.external.ExternalProcessor; import mvm.rya.indexing.external.tupleSet.ExternalTupleSet; import mvm.rya.indexing.external.tupleSet.SimpleExternalTupleSet; +import org.junit.Assert; import org.junit.Test; import org.openrdf.query.algebra.Projection; import org.openrdf.query.algebra.TupleExpr; @@ -42,284 +38,136 @@ import com.google.common.collect.Sets; public class GeneralizedExternalProcessorTest { - private String q7 = ""// - + "SELECT ?s ?t ?u " // - + "{" // - + " ?s a ?t ."// - + " ?t <http://www.w3.org/2000/01/rdf-schema#label> ?u ."// - + " ?u <uri:talksTo> ?s . "// - + "}";// - - - private String q8 = ""// - + "SELECT ?f ?m ?d ?e ?l ?c ?n ?o ?p ?a ?h ?r " // - + "{" // - + " ?h <http://www.w3.org/2000/01/rdf-schema#label> ?r ."// - + " ?f a ?m ."// - + " ?p <uri:talksTo> ?n . "// - + " ?e a ?l ."// - + " ?o <http://www.w3.org/2000/01/rdf-schema#label> ?p ."// - + " ?d <uri:talksTo> ?f . "// - + " ?c <uri:talksTo> ?e . "// - + " ?n a ?o ."// - + " ?a a ?h ."// - + " ?m <http://www.w3.org/2000/01/rdf-schema#label> ?d ."// - + " ?l <http://www.w3.org/2000/01/rdf-schema#label> ?c ."// - + " ?r <uri:talksTo> ?a . "// - + "}";// - - - - - private String q11 = ""// - + "SELECT ?f ?m ?d ?e ?l ?c ?n ?o ?p ?a ?h ?r ?x ?y ?w ?t ?duck ?chicken ?pig ?rabbit " // - + "{" // - + " ?w a ?t ."// - + " ?x a ?y ."// - + " ?duck a ?chicken ."// - + " ?pig a ?rabbit ."// - + " ?h <http://www.w3.org/2000/01/rdf-schema#label> ?r ."// - + " ?f a ?m ."// - + " ?p <uri:talksTo> ?n . "// - + " ?e a ?l ."// - + " ?o <http://www.w3.org/2000/01/rdf-schema#label> ?p ."// - + " ?d <uri:talksTo> ?f . "// - + " ?c <uri:talksTo> ?e . "// - + " ?n a ?o ."// - + " ?a a ?h ."// - + " ?m <http://www.w3.org/2000/01/rdf-schema#label> ?d ."// - + " ?l <http://www.w3.org/2000/01/rdf-schema#label> ?c ."// - + " ?r <uri:talksTo> ?a . "// - + "}";// - - - private String q12 = ""// - + "SELECT ?b ?p ?dog ?cat " // - + "{" // - + " ?b a ?p ."// - + " ?dog a ?cat. "// - + "}";// - - - - private String q13 = ""// - + "SELECT ?f ?m ?d ?e ?l ?c ?n ?o ?p ?a ?h ?r ?x ?y ?w ?t ?duck ?chicken ?pig ?rabbit ?dick ?jane ?betty " // - + "{" // - + " ?w a ?t ."// - + " ?x a ?y ."// - + " ?duck a ?chicken ."// - + " ?pig a ?rabbit ."// - + " ?h <http://www.w3.org/2000/01/rdf-schema#label> ?r ."// - + " ?f a ?m ."// - + " ?p <uri:talksTo> ?n . "// - + " ?e a ?l ."// - + " ?o <http://www.w3.org/2000/01/rdf-schema#label> ?p ."// - + " ?d <uri:talksTo> ?f . "// - + " ?c <uri:talksTo> ?e . "// - + " ?n a ?o ."// - + " ?a a ?h ."// - + " ?m <http://www.w3.org/2000/01/rdf-schema#label> ?d ."// - + " ?l <http://www.w3.org/2000/01/rdf-schema#label> ?c ."// - + " ?r <uri:talksTo> ?a . "// - + " ?dick <uri:talksTo> ?jane . "// - + " ?jane <uri:talksTo> ?betty . "// - + "}";// - - private String q14 = ""// - + "SELECT ?f ?m ?d ?e ?l ?c ?n ?o ?p ?a ?h ?r ?x ?y ?w ?t ?duck ?chicken ?pig ?rabbit " // - + "{" // - + " ?w a ?t ."// - + " ?x a ?y ."// - + " ?duck a ?chicken ."// - + " ?pig a ?rabbit ."// - + " ?h <http://www.w3.org/2000/01/rdf-schema#label> ?r ."// - + " ?f a ?m ."// - + " ?p <uri:talksTo> ?n . "// - + " ?e a ?l ."// - + " ?o <http://www.w3.org/2000/01/rdf-schema#label> ?p ."// - + " ?d <uri:talksTo> ?f . "// - + " ?c <uri:talksTo> ?e . "// - + " ?n a ?o ."// - + " ?a a ?h ."// - + " ?m <http://www.w3.org/2000/01/rdf-schema#label> ?d ."// - + " ?l <http://www.w3.org/2000/01/rdf-schema#label> ?c ."// - + " ?r <uri:talksTo> ?a . "// - + " ?d <uri:talksTo> ?a . "// - + "}";// - - - private String q15 = ""// - + "SELECT ?f ?m ?d ?e ?l ?c " // - + "{" // - + " ?f a ?m ."// - + " ?e a ?l ."// - + " ?d <uri:talksTo> ?f . "// - + " ?c <uri:talksTo> ?e . "// - + " ?m <http://www.w3.org/2000/01/rdf-schema#label> ?d ."// - + " ?l <http://www.w3.org/2000/01/rdf-schema#label> ?c ."// - + "}";// - - private String q16 = ""// - + "SELECT ?f ?m ?d ?e ?l ?c " // - + "{" // - + " ?d <uri:talksTo> ?f . "// - + " ?c <uri:talksTo> ?e . "// - + " ?m <http://www.w3.org/2000/01/rdf-schema#label> ?d ."// - + " ?l <http://www.w3.org/2000/01/rdf-schema#label> ?c ."// - + "}";// - - private String q17 = ""// - + "SELECT ?dog ?cat ?chicken " // - + "{" // - + " ?chicken <uri:talksTo> ?dog . "// - + " ?cat <http://www.w3.org/2000/01/rdf-schema#label> ?chicken ."// - + "}";// - - private String q18 = ""// - + "SELECT ?dog ?chicken " // - + "{" // - + " ?chicken <uri:talksTo> ?dog . "// - + "}";// - - private String q19 = ""// - + "SELECT ?cat ?chicken " // - + "{" // - + " ?cat <http://www.w3.org/2000/01/rdf-schema#label> ?chicken ."// - + "}";// - - - - - - - - - - //@Test - public void testTwoIndexLargeQuery() throws Exception { - - SPARQLParser parser = new SPARQLParser(); - - - ParsedQuery pq1 = parser.parseQuery(q15, null); - ParsedQuery pq2 = parser.parseQuery(q7, null); - ParsedQuery pq3 = parser.parseQuery(q12, null); - - - - System.out.println("Query is " + pq1.getTupleExpr()); - - SimpleExternalTupleSet extTup1 = new SimpleExternalTupleSet(new Projection(pq2.getTupleExpr())); - SimpleExternalTupleSet extTup2 = new SimpleExternalTupleSet(new Projection(pq3.getTupleExpr())); - //SimpleExternalTupleSet extTup3 = new SimpleExternalTupleSet(new Projection(pq5.getTupleExpr())); - - - List<ExternalTupleSet> list = new ArrayList<ExternalTupleSet>(); - - list.add(extTup2); - //list.add(extTup3); - list.add(extTup1); - - - IndexedExecutionPlanGenerator iep = new IndexedExecutionPlanGenerator(pq1.getTupleExpr(),list); - List<ExternalTupleSet> indexSet = iep.getNormalizedIndices(); - Assert.assertEquals(4, indexSet.size()); - -// System.out.println("Normalized indices are: "); -// for(ExternalTupleSet e: indexSet) { -// System.out.println(e.getTupleExpr()); -// } - - Set<TupleExpr> processedTups = Sets.newHashSet(iep.getIndexedTuples()); - - Assert.assertEquals(5, processedTups.size()); - - // System.out.println("Size is " + processedTups.size()); - -// System.out.println("Indexed tuples are :" ); -// for(TupleExpr te: processedTups) { -// System.out.println(te); -// } - - - - - - - } - - - - - - @Test - public void testThreeIndexQuery() throws Exception { - - SPARQLParser parser = new SPARQLParser(); - - - ParsedQuery pq1 = parser.parseQuery(q16, null); - ParsedQuery pq2 = parser.parseQuery(q17, null); - ParsedQuery pq3 = parser.parseQuery(q18, null); - ParsedQuery pq4 = parser.parseQuery(q19, null); - - - - System.out.println("Query is " + pq1.getTupleExpr()); - - SimpleExternalTupleSet extTup1 = new SimpleExternalTupleSet(new Projection(pq2.getTupleExpr())); - SimpleExternalTupleSet extTup2 = new SimpleExternalTupleSet(new Projection(pq3.getTupleExpr())); - SimpleExternalTupleSet extTup3 = new SimpleExternalTupleSet(new Projection(pq4.getTupleExpr())); - - - List<ExternalTupleSet> list = new ArrayList<ExternalTupleSet>(); - - list.add(extTup2); - list.add(extTup3); - list.add(extTup1); - - - IndexedExecutionPlanGenerator iep = new IndexedExecutionPlanGenerator(pq1.getTupleExpr(),list); - List<ExternalTupleSet> indexSet = iep.getNormalizedIndices(); - Assert.assertEquals(6, indexSet.size()); - -// System.out.println("Normalized indices are: "); -// for(ExternalTupleSet e: indexSet) { -// System.out.println(e.getTupleExpr()); -// } - - Set<TupleExpr> processedTups = Sets.newHashSet(iep.getIndexedTuples()); - - Assert.assertEquals(17, processedTups.size()); - - // System.out.println("Size is " + processedTups.size()); - -// System.out.println("Indexed tuples are :" ); -// for(TupleExpr te: processedTups) { -// System.out.println(te); -// } - - - TupleExecutionPlanGenerator tep = new TupleExecutionPlanGenerator(); - List<TupleExpr> plans = Lists.newArrayList(tep.getPlans(processedTups.iterator())); - - - System.out.println("Size is " + plans.size()); - - System.out.println("Possible indexed tuple plans are :" ); - for(TupleExpr te: plans) { - System.out.println(te); - } - - - - - } - - - - - + private String q7 = ""// + + "SELECT ?s ?t ?u " // + + "{" // + + " ?s a ?t ."// + + " ?t <http://www.w3.org/2000/01/rdf-schema#label> ?u ."// + + " ?u <uri:talksTo> ?s . "// + + "}";// + + private String q12 = ""// + + "SELECT ?b ?p ?dog ?cat " // + + "{" // + + " ?b a ?p ."// + + " ?dog a ?cat. "// + + "}";// + + private String q15 = ""// + + "SELECT ?f ?m ?d ?e ?l ?c " // + + "{" // + + " ?f a ?m ."// + + " ?e a ?l ."// + + " ?d <uri:talksTo> ?f . "// + + " ?c <uri:talksTo> ?e . "// + + " ?m <http://www.w3.org/2000/01/rdf-schema#label> ?d ."// + + " ?l <http://www.w3.org/2000/01/rdf-schema#label> ?c ."// + + "}";// + + private String q16 = ""// + + "SELECT ?f ?m ?d ?e ?l ?c " // + + "{" // + + " ?d <uri:talksTo> ?f . "// + + " ?c <uri:talksTo> ?e . "// + + " ?m <http://www.w3.org/2000/01/rdf-schema#label> ?d ."// + + " ?l <http://www.w3.org/2000/01/rdf-schema#label> ?c ."// + + "}";// + + private String q17 = ""// + + "SELECT ?dog ?cat ?chicken " // + + "{" // + + " ?chicken <uri:talksTo> ?dog . "// + + " ?cat <http://www.w3.org/2000/01/rdf-schema#label> ?chicken ."// + + "}";// + + private String q18 = ""// + + "SELECT ?dog ?chicken " // + + "{" // + + " ?chicken <uri:talksTo> ?dog . "// + + "}";// + + private String q19 = ""// + + "SELECT ?cat ?chicken " // + + "{" // + + " ?cat <http://www.w3.org/2000/01/rdf-schema#label> ?chicken ."// + + "}";// + + @Test + public void testTwoIndexLargeQuery() throws Exception { + + SPARQLParser parser = new SPARQLParser(); + + ParsedQuery pq1 = parser.parseQuery(q15, null); + ParsedQuery pq2 = parser.parseQuery(q7, null); + ParsedQuery pq3 = parser.parseQuery(q12, null); + + System.out.println("Query is " + pq1.getTupleExpr()); + + SimpleExternalTupleSet extTup1 = new SimpleExternalTupleSet( + new Projection(pq2.getTupleExpr())); + SimpleExternalTupleSet extTup2 = new SimpleExternalTupleSet( + new Projection(pq3.getTupleExpr())); + + List<ExternalTupleSet> list = new ArrayList<ExternalTupleSet>(); + + list.add(extTup2); + list.add(extTup1); + + IndexedExecutionPlanGenerator iep = new IndexedExecutionPlanGenerator( + pq1.getTupleExpr(), list); + List<ExternalTupleSet> indexSet = iep.getNormalizedIndices(); + Assert.assertEquals(4, indexSet.size()); + + Set<TupleExpr> processedTups = Sets.newHashSet(iep.getIndexedTuples()); + Assert.assertEquals(5, processedTups.size()); + + } + + @Test + public void testThreeIndexQuery() throws Exception { + + SPARQLParser parser = new SPARQLParser(); + + ParsedQuery pq1 = parser.parseQuery(q16, null); + ParsedQuery pq2 = parser.parseQuery(q17, null); + ParsedQuery pq3 = parser.parseQuery(q18, null); + ParsedQuery pq4 = parser.parseQuery(q19, null); + + System.out.println("Query is " + pq1.getTupleExpr()); + + SimpleExternalTupleSet extTup1 = new SimpleExternalTupleSet( + new Projection(pq2.getTupleExpr())); + SimpleExternalTupleSet extTup2 = new SimpleExternalTupleSet( + new Projection(pq3.getTupleExpr())); + SimpleExternalTupleSet extTup3 = new SimpleExternalTupleSet( + new Projection(pq4.getTupleExpr())); + + List<ExternalTupleSet> list = new ArrayList<ExternalTupleSet>(); + + list.add(extTup2); + list.add(extTup3); + list.add(extTup1); + + IndexedExecutionPlanGenerator iep = new IndexedExecutionPlanGenerator( + pq1.getTupleExpr(), list); + List<ExternalTupleSet> indexSet = iep.getNormalizedIndices(); + Assert.assertEquals(6, indexSet.size()); + + Set<TupleExpr> processedTups = Sets.newHashSet(iep.getIndexedTuples()); + + Assert.assertEquals(17, processedTups.size()); + + TupleExecutionPlanGenerator tep = new TupleExecutionPlanGenerator(); + List<TupleExpr> plans = Lists.newArrayList(tep.getPlans(processedTups + .iterator())); + + System.out.println("Size is " + plans.size()); + + System.out.println("Possible indexed tuple plans are :"); + for (TupleExpr te : plans) { + System.out.println(te); + } + } }
