http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/AccumuloIndexSet.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/AccumuloIndexSet.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/AccumuloIndexSet.java new file mode 100644 index 0000000..47a8502 --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/AccumuloIndexSet.java @@ -0,0 +1,625 @@ +package mvm.rya.indexing.external.tupleSet; + +/* + * #%L + * mvm.rya.indexing.accumulo + * %% + * Copyright (C) 2014 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import info.aduna.iteration.CloseableIteration; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NoSuchElementException; +import java.util.Set; + +import mvm.rya.accumulo.precompQuery.AccumuloPrecompQueryIndexer; +import mvm.rya.rdftriplestore.evaluation.ExternalBatchingIterator; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.Text; +import org.openrdf.model.Literal; +import org.openrdf.model.URI; +import org.openrdf.model.impl.LiteralImpl; +import org.openrdf.model.impl.URIImpl; +import org.openrdf.query.Binding; +import org.openrdf.query.BindingSet; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.algebra.Projection; +import org.openrdf.query.algebra.QueryModelNode; +import org.openrdf.query.algebra.StatementPattern; +import org.openrdf.query.algebra.ValueExpr; +import org.openrdf.query.algebra.Var; +import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; +import org.openrdf.query.impl.EmptyBindingSet; +import org.openrdf.query.parser.ParsedTupleQuery; +import org.openrdf.query.parser.sparql.SPARQLParser; +import org.openrdf.repository.sail.SailRepositoryConnection; +import org.openrdf.sail.SailException; + +import com.beust.jcommander.internal.Sets; +import com.google.common.base.Joiner; +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +public class AccumuloIndexSet extends ExternalTupleSet implements ExternalBatchingIterator { + + private static final int WRITER_MAX_WRITE_THREADS = 30; + private static final long WRITER_MAX_LATNECY = Long.MAX_VALUE; + private static final long WRITER_MAX_MEMORY = 500L * 1024L * 1024L; + private Map<String,AccValueFactory> bindings; + private List<String> bindingslist; + private final Connector accCon; + private final String tablename; + private long tableSize = 0; + private List<String> varOrder = null; + + + public static interface AccValueFactory { + public org.openrdf.model.Value create(String str); + + public String create(org.openrdf.model.Value val); + } + + public static class AccUrlFactory implements AccValueFactory { + @Override + public org.openrdf.model.Value create(final String str) { + return new URIImpl(str); + } + + @Override + public String create(org.openrdf.model.Value val) { + return val.stringValue(); + } + } + + public static class AccValueFactoryImpl implements AccValueFactory { + @Override + public org.openrdf.model.Value create(final String str) { + String[] split = str.split("\u0001"); + if (split.length > 1 && split[1].equals("1")) { + return new URIImpl(split[0]); + } + if (split[0].contains(":")) { + return new URIImpl(split[0]); + } + return new LiteralImpl(split[0]); + } + + @Override + public String create(org.openrdf.model.Value val) { + if (val instanceof URI) { + return val.stringValue() + "\u0001" + 1; + } + if (val instanceof Literal) { + Literal v = (Literal) val; + return v.getLabel() + "\u0001" + 2; + } + return null; + } + } + + + //TODO set supportedVarOrderMap + public AccumuloIndexSet(String sparql, SailRepositoryConnection conn, Connector accCon, String tablename) throws MalformedQueryException, SailException, + QueryEvaluationException, MutationsRejectedException, TableNotFoundException { + super(null); + this.tablename = tablename; + this.accCon = accCon; + SPARQLParser sp = new SPARQLParser(); + ParsedTupleQuery pq = (ParsedTupleQuery) sp.parseQuery(sparql, null); + + setProjectionExpr((Projection) pq.getTupleExpr()); + CloseableIteration<BindingSet,QueryEvaluationException> iter = (CloseableIteration<BindingSet,QueryEvaluationException>) conn.getSailConnection() + .evaluate(getTupleExpr(), null, new EmptyBindingSet(), false); + + BatchWriter w = accCon.createBatchWriter(tablename, WRITER_MAX_MEMORY, WRITER_MAX_LATNECY, WRITER_MAX_WRITE_THREADS); + this.bindingslist = Lists.newArrayList(pq.getTupleExpr().getAssuredBindingNames()); + + this.bindings = Maps.newHashMap(); + + pq.getTupleExpr().visit(new QueryModelVisitorBase<RuntimeException>() { + @Override + public void meet(Var node) { + QueryModelNode parent = node.getParentNode(); + if (parent instanceof StatementPattern) { + StatementPattern statement = (StatementPattern) parent; + if (node.equals(statement.getSubjectVar())) { + bindings.put(node.getName(), new AccUrlFactory()); + } + if (node.equals(statement.getPredicateVar())) { + bindings.put(node.getName(), new AccUrlFactory()); + } + if (node.equals(statement.getObjectVar())) { + bindings.put(node.getName(), new AccValueFactoryImpl()); + } + if (node.equals(statement.getContextVar())) { + // TODO is this correct? + bindings.put(node.getName(), new AccUrlFactory()); + } + } else if(parent instanceof ValueExpr) { + bindings.put(node.getName(), new AccValueFactoryImpl()); + } + }; + }); + + + + + + varOrder = new ArrayList<String>(bindingslist.size()); + + while (iter.hasNext()) { + + BindingSet bs = iter.next(); + List<String> shiftBindingList = null; + for (int j = 0; j < bindingslist.size(); j++) { + StringBuffer sb = new StringBuffer(); + shiftBindingList = listShift(bindingslist, j); //TODO calling this each time not efficient + String order = ""; + for (String b : shiftBindingList) { + String val = bindings.get(b).create(bs.getValue(b)); + sb.append(val).append("\u0000"); + if (order.length() == 0) { + order = b; + } else { + order = order + "\u0000" + b; + } + } + + if (varOrder.size() < bindingslist.size()) { + varOrder.add(order); + } + + //System.out.println("String buffer is " + sb); + Mutation m = new Mutation(sb.deleteCharAt(sb.length() - 1).toString()); + m.put(new Text(varOrder.get(j)), new Text(""), new org.apache.accumulo.core.data.Value(new byte[]{})); + w.addMutation(m); + } + tableSize += 1; + } + + setLocalityGroups(tablename, accCon, varOrder); + this.setSupportedVariableOrderMap(createSupportedVarOrderMap(varOrder)); + + + String orders = ""; + + for(String s : varOrder) { + s = s.replace("\u0000", ";"); + if(orders.length() == 0) { + orders = s; + } else { + orders = orders + "\u0000" + s; + } + } + + + Mutation m = new Mutation("~SPARQL"); + Value v = new Value(sparql.getBytes()); + m.put(new Text("" + tableSize), new Text(orders), v); + w.addMutation(m); + + w.close(); + iter.close(); + } + + + + + @Override + public Map<String, Set<String>> getSupportedVariableOrders() { + + return this.getSupportedVariableOrderMap(); + + } + + + @Override + public boolean supportsBindingSet(Set<String> bindingNames) { + + Map<String, Set<String>> varOrderMap = this.getSupportedVariableOrders(); + Collection<Set<String>> values = varOrderMap.values(); + Set<String> bNames = Sets.newHashSet(); + + for (String s : this.getTupleExpr().getAssuredBindingNames()) { + if (bindingNames.contains(s)) { + bNames.add(s); + } + } + + return values.contains(bNames); + } + + + private String getVarOrder(Set<String> variables) { + + Map<String, Set<String>> varOrderMap = this.getSupportedVariableOrders(); + + Set<Map.Entry<String, Set<String>>> entries = varOrderMap.entrySet(); + + for (Map.Entry<String, Set<String>> e : entries) { + + if (e.getValue().equals(variables)) { + return e.getKey(); + } + + } + + return null; + + } + + private String prefixToOrder(String order) { + + Map<String, String> invMap = HashBiMap.create(this.getTableVarMap()).inverse(); + String[] temp = order.split("\u0000"); + + for (int i = 0; i < temp.length; i++) { + temp[i] = this.getTableVarMap().get(temp[i]); + } + + order = Joiner.on("\u0000").join(temp); + + for (String s : varOrder) { + if (s.startsWith(order)) { + + temp = s.split("\u0000"); + + for (int i = 0; i < temp.length; i++) { + temp[i] = invMap.get(temp[i]); + } + return Joiner.on("\u0000").join(temp); + } + } + throw new NoSuchElementException("Order is not a prefix of any locality group value!"); + } + + private String orderToLocGroup(List<String> order) { + String localityGroup = ""; + for (String s : order) { + if (localityGroup.length() == 0) { + localityGroup = this.getTableVarMap().get(s); + } else { + localityGroup = localityGroup + "\u0000" + this.getTableVarMap().get(s); + } + } + return localityGroup; + + } + + + private void setLocalityGroups(String tableName, Connector conn, List<String> groups) { + + HashMap<String, Set<Text>> localityGroups = new HashMap<String, Set<Text>>(); + + + + for (int i = 0; i < groups.size(); i++) { + HashSet<Text> tempColumn = new HashSet<Text>(); + tempColumn.add(new Text(groups.get(i))); + String groupName = groups.get(i).replace("\u0000",""); + localityGroups.put(groupName, tempColumn); + } + + + try { + conn.tableOperations().setLocalityGroups(tableName, localityGroups); + } catch (AccumuloException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (AccumuloSecurityException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (TableNotFoundException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + + + } + + + + + + + + private List<String> listShift(List<String> list, int j) { + + if(j >= list.size()) { + throw new IllegalArgumentException(); + } + + List<String> shiftList = Lists.newArrayList(); + for(int i=0; i<list.size(); i++) { + shiftList.add(list.get((i+j)%list.size())); + } + + return shiftList; + } + + + + private Set<String> getConstantConstraints() { + + Map<String, String> tableMap = this.getTableVarMap(); + Set<String> keys = tableMap.keySet(); + Set<String> constants = Sets.newHashSet(); + + for (String s : keys) { + if (s.startsWith("-const-")) { + constants.add(s); + } + + } + + return constants; + + } + + + + + public AccumuloIndexSet(String sparql, Connector accCon, String tablename) throws MalformedQueryException, SailException, QueryEvaluationException, + MutationsRejectedException, TableNotFoundException { + super(null); + this.tablename = tablename; + this.accCon = accCon; + SPARQLParser sp = new SPARQLParser(); + ParsedTupleQuery pq = (ParsedTupleQuery) sp.parseQuery(sparql, null); + + setProjectionExpr((Projection) pq.getTupleExpr()); + + this.bindingslist = Lists.newArrayList(pq.getTupleExpr().getAssuredBindingNames()); + + this.bindings = Maps.newHashMap(); + pq.getTupleExpr().visit(new QueryModelVisitorBase<RuntimeException>() { + @Override + public void meet(Var node) { + QueryModelNode parent = node.getParentNode(); + if (parent instanceof StatementPattern) { + StatementPattern statement = (StatementPattern) parent; + if (node.equals(statement.getSubjectVar())) { + bindings.put(node.getName(), new AccUrlFactory()); + } + if (node.equals(statement.getPredicateVar())) { + bindings.put(node.getName(), new AccUrlFactory()); + } + if (node.equals(statement.getObjectVar())) { + bindings.put(node.getName(), new AccValueFactoryImpl()); + } + if (node.equals(statement.getContextVar())) { + // TODO is this correct? + bindings.put(node.getName(), new AccUrlFactory()); + } + } else if(parent instanceof ValueExpr) { + bindings.put(node.getName(), new AccValueFactoryImpl()); + } + }; + }); + + + + + Scanner s = accCon.createScanner(tablename, new Authorizations()); + s.setRange(Range.exact(new Text("~SPARQL"))); + Iterator<Entry<Key,Value>> i = s.iterator(); + + String[] tempVarOrders = null; + + if (i.hasNext()) { + Entry<Key, Value> entry = i.next(); + Text ts = entry.getKey().getColumnFamily(); + tempVarOrders = entry.getKey().getColumnQualifier().toString().split("\u0000"); + tableSize = Long.parseLong(ts.toString()); + + } else { + throw new IllegalStateException("Index table contains no metadata!"); + } + + + varOrder = Lists.newArrayList(); + + for(String t: tempVarOrders) { + t = t.replace(";","\u0000"); + varOrder.add(t); + } + + setLocalityGroups(tablename, accCon, varOrder); + this.setSupportedVariableOrderMap(createSupportedVarOrderMap(varOrder)); + + } + + + + + private Map<String, Set<String>> createSupportedVarOrderMap(List<String> orders) { + + Map<String, Set<String>> supportedVars = Maps.newHashMap(); + + for (String t : orders) { + + String[] tempOrder = t.split("\u0000"); + Set<String> varSet = Sets.newHashSet(); + String u = ""; + + for (String s : tempOrder) { + if(u.length() == 0) { + u = s; + } else{ + u = u+ "\u0000" + s; + } + varSet.add(s); + supportedVars.put(u, new HashSet<String>(varSet)); + + } + + } + + return supportedVars; + } + + + + @Override + public void setProjectionExpr(Projection tupleExpr) { + super.setProjectionExpr(tupleExpr); + this.bindingslist = Lists.newArrayList(tupleExpr.getAssuredBindingNames()); + + this.bindings = Maps.newHashMap(); + tupleExpr.visit(new QueryModelVisitorBase<RuntimeException>() { + @Override + public void meet(Var node) { + QueryModelNode parent = node.getParentNode(); + if (parent instanceof StatementPattern) { + StatementPattern statement = (StatementPattern) parent; + if (node.equals(statement.getSubjectVar())) { + bindings.put(node.getName(), new AccUrlFactory()); + } + if (node.equals(statement.getPredicateVar())) { + bindings.put(node.getName(), new AccUrlFactory()); + } + if (node.equals(statement.getObjectVar())) { + bindings.put(node.getName(), new AccValueFactoryImpl()); + } + if (node.equals(statement.getContextVar())) { + // TODO is this correct? + bindings.put(node.getName(), new AccUrlFactory()); + } + } else if (parent instanceof ValueExpr) { //Add bindings associated with Filters + bindings.put(node.getName(), new AccValueFactoryImpl()); + } + }; + }); + + } + + @Override + public String getSignature() { + return "AccumuloIndexSet(" + tablename + ") : " + Joiner.on(", ").join(bindingslist); + } + + @Override + public CloseableIteration<BindingSet,QueryEvaluationException> evaluate(BindingSet bindingset) throws QueryEvaluationException { + return this.evaluate(Collections.singleton(bindingset)); + } + + @Override + public double cardinality() { + return tableSize; + } + + @Override + public CloseableIteration<BindingSet,QueryEvaluationException> evaluate(final Collection<BindingSet> bindingset) throws QueryEvaluationException { + + String localityGroup = ""; + Set<String> commonVars = Sets.newHashSet(); + + if (!bindingset.isEmpty()) { + + BindingSet bs = bindingset.iterator().next(); + for (String b : bindingslist) { + Binding v = bs.getBinding(b); + if (v != null) { + commonVars.add(b); + } + + } + } + + commonVars.addAll(getConstantConstraints()); + AccumuloPrecompQueryIndexer apq = null; + List<String> fullVarOrder = null; + try { + + if (commonVars.size() > 0) { + String commonVarOrder = getVarOrder(commonVars); + if(commonVarOrder == null) { + throw new IllegalStateException("Index does not support binding set!"); + } + fullVarOrder = Lists.newArrayList(prefixToOrder(commonVarOrder).split("\u0000")); + localityGroup = orderToLocGroup(fullVarOrder); + fullVarOrder.add("" + commonVars.size()); + + } else { + fullVarOrder = bindingslist; + localityGroup = orderToLocGroup(fullVarOrder); + fullVarOrder.add("" + 0); + } + + + apq = new AccumuloPrecompQueryIndexer(accCon, tablename); + ValueMapVisitor vmv = new ValueMapVisitor(); + this.getTupleExpr().visit(vmv); + + return apq.queryPrecompJoin(fullVarOrder, localityGroup, this.bindings, vmv.getValMap(), bindingset); + + } catch(TableNotFoundException e) { + throw new QueryEvaluationException(e); + } finally { + IOUtils.closeQuietly(apq); + } + } + + + public class ValueMapVisitor extends QueryModelVisitorBase<RuntimeException> { + + Map<String, org.openrdf.model.Value> valMap = Maps.newHashMap(); + + + public Map<String, org.openrdf.model.Value> getValMap() { + return valMap; + } + + @Override + public void meet(Var node) { + if (node.getName().startsWith("-const-")) { + valMap.put(node.getName(), node.getValue()); + } + + } + + } + + +} +
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/ExternalTupleSet.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/ExternalTupleSet.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/ExternalTupleSet.java new file mode 100644 index 0000000..7061cac --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/ExternalTupleSet.java @@ -0,0 +1,216 @@ +package mvm.rya.indexing.external.tupleSet; + +/* + * #%L + * mvm.rya.indexing.accumulo + * %% + * Copyright (C) 2014 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import info.aduna.iteration.CloseableIteration; + +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.openrdf.query.BindingSet; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.algebra.Projection; +import org.openrdf.query.algebra.QueryModelNode; +import org.openrdf.query.algebra.QueryModelVisitor; +import org.openrdf.query.algebra.Var; +import org.openrdf.query.algebra.evaluation.impl.ExternalSet; +import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; + +import com.beust.jcommander.internal.Sets; +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +/** + * Abstract class for an External Tuple Set. This Tuple + */ +public abstract class ExternalTupleSet extends ExternalSet { + + private Projection tupleExpr; + private Map<String, String> tableVarMap = Maps.newHashMap(); + private Map<String, Set<String>> supportedVarOrders = Maps.newHashMap(); + + + public ExternalTupleSet() { + + } + + public ExternalTupleSet(Projection tupleExpr) { + this.tupleExpr = tupleExpr; + } + + @Override + abstract public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(BindingSet bindings) throws QueryEvaluationException; + + @Override + public Set<String> getBindingNames() { + return tupleExpr.getBindingNames(); + } + + @Override + public Set<String> getAssuredBindingNames() { + return tupleExpr.getAssuredBindingNames(); + } + + @Override + public String getSignature() { + return "(External Projection) " + Joiner.on(", ").join(tupleExpr.getProjectionElemList().getElements()).replaceAll("\\s+", " "); + } + + public Projection getTupleExpr() { + return tupleExpr; + } + + public void setProjectionExpr(Projection tupleExpr) { + this.tupleExpr = tupleExpr; + } + + + public void setTableVarMap(Map<String,String> vars) { + this.tableVarMap = vars; + } + + + public Map<String, String> getTableVarMap() { + return this.tableVarMap; + } + + + public void setSupportedVariableOrderMap(Map<String, Set<String>> varOrders) { + this.supportedVarOrders = varOrders; + } + + + public Map<String, Set<String>> getSupportedVariableOrderMap() { + return supportedVarOrders; + } + + + public void updateTupleExp(final Map<Var, Var> oldToNewBindings) { + tupleExpr.visit(new QueryModelVisitorBase<RuntimeException>() { + @Override + public void meet(Var var) { + if (oldToNewBindings.containsKey(var)) { + var.replaceWith(oldToNewBindings.get(var)); + } + } + }); + } + + @Override + public ExternalSet clone() { + ExternalTupleSet clone = (ExternalTupleSet) super.clone(); + clone.tupleExpr = this.tupleExpr.clone(); + clone.tableVarMap = Maps.newHashMap(); + for(String s: this.tableVarMap.keySet()) { + clone.tableVarMap.put(s,this.tableVarMap.get(s)); + } + clone.supportedVarOrders = Maps.newHashMap(); + for(String s: this.supportedVarOrders.keySet()) { + clone.supportedVarOrders.put(s,this.supportedVarOrders.get(s)); + } + return clone; + } + + + public Map<String, Set<String>> getSupportedVariableOrders() { + + if (supportedVarOrders.size() != 0) { + return supportedVarOrders; + } else { + + Set<String> varSet = Sets.newHashSet(); + String t = ""; + + for (String s : tupleExpr.getAssuredBindingNames()) { + if (t.length() == 0) { + t = s; + } else { + t = t + "\u0000" + s; + } + + varSet.add(s); + supportedVarOrders.put(t, new HashSet<String>(varSet)); + + } + + return supportedVarOrders; + } + } + + + + + public boolean supportsBindingSet(Set<String> bindingNames) { + + Map<String, Set<String>> varOrderMap = getSupportedVariableOrders(); + String bNames = ""; + + for (String s : tupleExpr.getAssuredBindingNames()) { + if (bindingNames.contains(s)) { + if(bNames.length() == 0) { + bNames = s; + } else { + bNames = bNames + "\u0000"+ s; + } + } + } + + return varOrderMap.containsKey(bNames); + } + + + + @Override + public boolean equals(Object other) { + + if (!(other instanceof ExternalTupleSet)) { + return false; + } else { + + ExternalTupleSet arg = (ExternalTupleSet) other; + if (this.getTupleExpr().equals(arg.getTupleExpr())) { + return true; + } else { + return false; + } + + } + + } + + + @Override + public int hashCode() { + int result = 17; + result = 31*result + tupleExpr.hashCode(); + + return result; + } + + + + + + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/SimpleExternalTupleSet.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/SimpleExternalTupleSet.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/SimpleExternalTupleSet.java new file mode 100644 index 0000000..f98ddc8 --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/SimpleExternalTupleSet.java @@ -0,0 +1,87 @@ +package mvm.rya.indexing.external.tupleSet; + +/* + * #%L + * mvm.rya.rya.indexing + * %% + * Copyright (C) 2014 - 2015 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import info.aduna.iteration.CloseableIteration; + +import org.openrdf.query.BindingSet; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.algebra.Projection; +import org.openrdf.query.algebra.QueryModelVisitor; + +import com.google.common.base.Joiner; + + + + + + +public class SimpleExternalTupleSet extends ExternalTupleSet { + + + + public SimpleExternalTupleSet(Projection tuple) { + super(); + this.setProjectionExpr(tuple); + + } + + @Override + public <X extends Exception> void visit(QueryModelVisitor<X> visitor) + throws X + { + visitor.meetOther(this); + } + + @Override + public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(BindingSet bindings) + throws QueryEvaluationException { + // TODO Auto-generated method stub + return null; + } + + @Override + public String getSignature() { + return "(SimpleExternalTupleSet) " + + Joiner.on(", ").join(this.getTupleExpr().getProjectionElemList().getElements()).replaceAll("\\s+", " "); + + } + + @Override + public boolean equals(Object other) { + + if (!(other instanceof SimpleExternalTupleSet)) { + return false; + } else { + + SimpleExternalTupleSet arg = (SimpleExternalTupleSet) other; + if (this.getTupleExpr().equals(arg.getTupleExpr())) { + return true; + } else { + return false; + } + + } + + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/AbstractMongoIndexer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/AbstractMongoIndexer.java b/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/AbstractMongoIndexer.java new file mode 100644 index 0000000..dd61edc --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/AbstractMongoIndexer.java @@ -0,0 +1,54 @@ +package mvm.rya.indexing.mongodb; + +import java.io.IOException; +import java.util.Collection; + +import org.apache.hadoop.conf.Configuration; +import org.openrdf.model.Statement; + +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.domain.RyaURI; +import mvm.rya.api.persist.index.RyaSecondaryIndexer; + +public abstract class AbstractMongoIndexer implements RyaSecondaryIndexer { + + @Override + public void close() throws IOException { + } + + @Override + public void flush() throws IOException { + } + + + @Override + public Configuration getConf() { + return null; + } + + + @Override + public String getTableName() { + return null; + } + + @Override + public void storeStatements(Collection<RyaStatement> ryaStatements) + throws IOException { + for (RyaStatement ryaStatement : ryaStatements){ + storeStatement(ryaStatement); + } + + } + + @Override + public void deleteStatement(RyaStatement stmt) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void dropGraph(RyaURI... graphs) { + throw new UnsupportedOperationException(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/GeoMongoDBStorageStrategy.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/GeoMongoDBStorageStrategy.java b/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/GeoMongoDBStorageStrategy.java new file mode 100644 index 0000000..d945ba3 --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/GeoMongoDBStorageStrategy.java @@ -0,0 +1,166 @@ +package mvm.rya.indexing.mongodb; + +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import mvm.rya.indexing.StatementContraints; +import mvm.rya.indexing.accumulo.StatementSerializer; +import mvm.rya.indexing.accumulo.geo.GeoParseUtils; + +import org.apache.commons.codec.binary.Hex; +import org.openrdf.model.Statement; +import org.openrdf.model.URI; +import org.openrdf.model.impl.ValueFactoryImpl; + +import com.mongodb.BasicDBList; +import com.mongodb.BasicDBObject; +import com.mongodb.DBCollection; +import com.mongodb.DBObject; +import com.vividsolutions.jts.geom.Coordinate; +import com.vividsolutions.jts.geom.Geometry; +import com.vividsolutions.jts.io.ParseException; +import com.vividsolutions.jts.io.WKTReader; + +public class GeoMongoDBStorageStrategy { + + private static final String ID = "_id"; + private static final String GEO = "location"; + private static final String CONTEXT = "context"; + private static final String PREDICATE = "predicate"; + private static final String OBJECT = "object"; + private static final String SUBJECT = "subject"; + public enum GeoQueryType { + INTERSECTS { + public String getKeyword() { + return "$geoIntersects"; + } + }, WITHIN { + public String getKeyword() { + return "$geoWithin"; + } + }, + EQUALS { + public String getKeyword() { + return "$near"; + } + }; + + public abstract String getKeyword(); + } + + private double maxDistance; + + + public GeoMongoDBStorageStrategy(double maxDistance) { + this.maxDistance = maxDistance; + } + + public void createIndices(DBCollection coll){ + coll.createIndex("{" + GEO + " : \"2dsphere\"" ); + } + + public DBObject getQuery(StatementContraints contraints, Geometry geo, GeoQueryType queryType) { + BasicDBObject query; + if (queryType.equals(GeoQueryType.EQUALS)){ + List<double[]> points = getCorrespondingPoints(geo); + if (points.size() == 1){ + List circle = new ArrayList(); + circle.add(points.get(0)); + circle.add(maxDistance); + BasicDBObject polygon = new BasicDBObject("$centerSphere", circle); + query = new BasicDBObject(GEO, new BasicDBObject(GeoQueryType.WITHIN.getKeyword(), polygon)); + }else { + query = new BasicDBObject(GEO, points); + } + + } + else { + query = new BasicDBObject(GEO, new BasicDBObject(queryType.getKeyword(), new BasicDBObject("$polygon", getCorrespondingPoints(geo)))); + } + if (contraints.hasSubject()){ + query.append(SUBJECT, contraints.getSubject().toString()); + } + if (contraints.hasPredicates()){ + Set<URI> predicates = contraints.getPredicates(); + if (predicates.size() > 1){ + BasicDBList or = new BasicDBList(); + for (URI pred : predicates){ + DBObject currentPred = new BasicDBObject(PREDICATE, pred.toString()); + or.add(currentPred); + } + query.append("$or", or); + } + else if (!predicates.isEmpty()){ + query.append(PREDICATE, predicates.iterator().next().toString()); + } + } + if (contraints.hasContext()){ + query.append(CONTEXT, contraints.getContext().toString()); + } + + return query; + } + + + public Statement deserializeDBObject(DBObject queryResult) { + Map result = queryResult.toMap(); + String subject = (String) result.get(SUBJECT); + String object = (String) result.get(OBJECT); + String predicate = (String) result.get(PREDICATE); + String context = (String) result.get(CONTEXT); + if (!context.isEmpty()){ + return StatementSerializer.readStatement(subject, predicate, object, context); + } + return StatementSerializer.readStatement(subject, predicate, object); + } + + + + public DBObject serialize(Statement statement) throws ParseException{ + // if the object is wkt, then try to index it + // write the statement data to the fields + Geometry geo = (new WKTReader()).read(GeoParseUtils.getWellKnownText(statement)); + if(geo == null || geo.isEmpty() || !geo.isValid()) { + throw new ParseException("Could not create geometry for statement " + statement); + } + + String context = ""; + if (statement.getContext() != null){ + context = StatementSerializer.writeContext(statement); + } + String id = StatementSerializer.writeSubject(statement) + " " + + StatementSerializer.writePredicate(statement) + " " + StatementSerializer.writeObject(statement) + " " + context; + byte[] bytes = id.getBytes(); + try { + MessageDigest digest = MessageDigest.getInstance("SHA-1"); + bytes = digest.digest(bytes); + } catch (NoSuchAlgorithmException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + BasicDBObject doc = new BasicDBObject(ID, new String(Hex.encodeHex(bytes))) + .append(GEO, getCorrespondingPoints(geo)) + .append(SUBJECT, StatementSerializer.writeSubject(statement)) + .append(PREDICATE, StatementSerializer.writePredicate(statement)) + .append(OBJECT, StatementSerializer.writeObject(statement)) + .append(CONTEXT, context); + return doc; + + } + + private List<double[]> getCorrespondingPoints(Geometry geo){ + List<double[]> points = new ArrayList<double[]>(); + for (Coordinate coord : geo.getCoordinates()){ + points.add(new double[] { + coord.x, coord.y + }); + } + return points; + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/MongoGeoIndexer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/MongoGeoIndexer.java b/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/MongoGeoIndexer.java new file mode 100644 index 0000000..35e69b1 --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/MongoGeoIndexer.java @@ -0,0 +1,243 @@ +package mvm.rya.indexing.mongodb; + +import info.aduna.iteration.CloseableIteration; + +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Set; + +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.resolver.RyaToRdfConversions; +import mvm.rya.indexing.GeoIndexer; +import mvm.rya.indexing.StatementContraints; +import mvm.rya.indexing.accumulo.ConfigUtils; +import mvm.rya.indexing.accumulo.temporal.AccumuloTemporalIndexer; +import mvm.rya.indexing.mongodb.GeoMongoDBStorageStrategy.GeoQueryType; +import mvm.rya.mongodb.MongoDBRdfConfiguration; + +import org.apache.hadoop.conf.Configuration; +import org.apache.log4j.Logger; +import org.opengis.feature.simple.SimpleFeature; +import org.openrdf.model.Literal; +import org.openrdf.model.Statement; +import org.openrdf.model.URI; +import org.openrdf.query.QueryEvaluationException; + +import com.mongodb.DB; +import com.mongodb.DBCollection; +import com.mongodb.DBCursor; +import com.mongodb.DBObject; +import com.mongodb.MongoClient; +import com.mongodb.MongoCredential; +import com.mongodb.ServerAddress; +import com.vividsolutions.jts.geom.Geometry; +import com.vividsolutions.jts.io.ParseException; + +public class MongoGeoIndexer extends AbstractMongoIndexer implements GeoIndexer{ + + private static final Logger logger = Logger.getLogger(MongoGeoIndexer.class); + + private GeoMongoDBStorageStrategy storageStrategy; + private MongoClient mongoClient; + private DB db; + private DBCollection coll; + private Set<URI> predicates; + private Configuration conf; + private boolean isInit = false; + private String tableName = ""; + + + + private void init() throws NumberFormatException, UnknownHostException{ + ServerAddress server = new ServerAddress(conf.get(MongoDBRdfConfiguration.MONGO_INSTANCE), + Integer.valueOf(conf.get(MongoDBRdfConfiguration.MONGO_INSTANCE_PORT))); + this.conf = conf; + if (conf.get(MongoDBRdfConfiguration.MONGO_USER) != null){ + MongoCredential cred = MongoCredential.createCredential(conf.get(MongoDBRdfConfiguration.MONGO_USER), conf.get(MongoDBRdfConfiguration.MONGO_USER_PASSWORD), + conf.get(MongoDBRdfConfiguration.MONGO_DB_NAME).toCharArray()); + mongoClient = new MongoClient(server, Arrays.asList(cred)); + } + else { + mongoClient = new MongoClient(server); + } + predicates = ConfigUtils.getGeoPredicates(conf); + tableName = conf.get(MongoDBRdfConfiguration.MONGO_DB_NAME); + db = mongoClient.getDB(tableName); + coll = db.getCollection(conf.get(MongoDBRdfConfiguration.MONGO_COLLECTION_PREFIX, "rya") + "_geo"); + storageStrategy = new GeoMongoDBStorageStrategy(Double.valueOf(conf.get(MongoDBRdfConfiguration.MONGO_GEO_MAXDISTANCE, "1e-10"))); + } + + + @Override + public String getTableName() { + return tableName; + } + + @Override + public Configuration getConf() { + return conf; + } + + //setConf initializes because index is created via reflection + @Override + public void setConf(Configuration conf) { + this.conf = conf; + if (!isInit) { + try { + init(); + isInit = true; + } catch (NumberFormatException e) { + logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); + throw new RuntimeException(e); + } catch (UnknownHostException e) { + logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); + throw new RuntimeException(e); + } + } + } + + + + private void storeStatement(Statement statement) throws IOException { + // if this is a valid predicate and a valid geometry + boolean isValidPredicate = predicates.isEmpty() || predicates.contains(statement.getPredicate()); + + if (isValidPredicate && (statement.getObject() instanceof Literal)) { + + // add it to the collection + try { + DBObject obj = storageStrategy.serialize(statement); + if (obj != null){ + coll.insert(obj); + } + } + catch (com.mongodb.MongoException.DuplicateKey exception){ + // ignore + } + catch (com.mongodb.DuplicateKeyException exception){ + // ignore + } + catch (Exception ex){ + // ignore single exceptions + ex.printStackTrace(); + } + } + } + + + @Override + public void storeStatement(RyaStatement statement) throws IOException { + storeStatement(RyaToRdfConversions.convertStatement(statement)); + } + + + + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryEquals( + Geometry query, StatementContraints contraints) { + DBObject queryObj = storageStrategy.getQuery(contraints, query, GeoQueryType.EQUALS); + return getIteratorWrapper(queryObj, coll, storageStrategy); + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryDisjoint( + Geometry query, StatementContraints contraints) { + throw new UnsupportedOperationException("Disjoint queries are not supported in Mongo DB."); + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryIntersects( + Geometry query, StatementContraints contraints) { + DBObject queryObj = storageStrategy.getQuery(contraints, query, GeoQueryType.INTERSECTS); + return getIteratorWrapper(queryObj, coll, storageStrategy); + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryTouches( + Geometry query, StatementContraints contraints) { + throw new UnsupportedOperationException("Touches queries are not supported in Mongo DB."); + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryCrosses( + Geometry query, StatementContraints contraints) { + throw new UnsupportedOperationException("Crosses queries are not supported in Mongo DB."); + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryWithin( + Geometry query, StatementContraints contraints) { + DBObject queryObj = storageStrategy.getQuery(contraints, query, GeoQueryType.WITHIN); + return getIteratorWrapper(queryObj, coll, storageStrategy); + } + + + private CloseableIteration<Statement, QueryEvaluationException> getIteratorWrapper(final DBObject query, final DBCollection coll, final GeoMongoDBStorageStrategy storageStrategy) { + + return new CloseableIteration<Statement, QueryEvaluationException>() { + + private DBCursor cursor = null; + + private DBCursor getIterator() throws QueryEvaluationException { + if (cursor == null){ + cursor = coll.find(query); + } + return cursor; + } + + @Override + public boolean hasNext() throws QueryEvaluationException { + return getIterator().hasNext(); + } + + @Override + public Statement next() throws QueryEvaluationException { + DBObject feature = getIterator().next(); + return storageStrategy.deserializeDBObject(feature); + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Remove not implemented"); + } + + @Override + public void close() throws QueryEvaluationException { + getIterator().close(); + } + }; + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryContains( + Geometry query, StatementContraints contraints) { + throw new UnsupportedOperationException("Contains queries are not supported in Mongo DB."); + } + + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryOverlaps( + Geometry query, StatementContraints contraints) { + throw new UnsupportedOperationException("Overlaps queries are not supported in Mongo DB."); + } + + @Override + public Set<URI> getIndexablePredicates() { + return predicates; + } + + @Override + public void flush() throws IOException { + // TODO Auto-generated method stub + + } + + @Override + public void close() throws IOException { + mongoClient.close(); + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/MongoGeoTupleSet.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/MongoGeoTupleSet.java b/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/MongoGeoTupleSet.java new file mode 100644 index 0000000..a325b06 --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/indexing/mongodb/MongoGeoTupleSet.java @@ -0,0 +1,341 @@ +package mvm.rya.indexing.mongodb; + +import info.aduna.iteration.CloseableIteration; + +import java.util.Map; +import java.util.Set; + +import mvm.rya.indexing.GeoIndexer; +import mvm.rya.indexing.IndexingExpr; +import mvm.rya.indexing.IteratorFactory; +import mvm.rya.indexing.SearchFunction; +import mvm.rya.indexing.StatementContraints; +import mvm.rya.indexing.accumulo.geo.GeoConstants; +import mvm.rya.indexing.accumulo.geo.GeoTupleSet; +import mvm.rya.indexing.external.tupleSet.ExternalTupleSet; + +import org.apache.hadoop.conf.Configuration; +import org.openrdf.model.Statement; +import org.openrdf.model.URI; +import org.openrdf.query.BindingSet; +import org.openrdf.query.QueryEvaluationException; + +import com.google.common.base.Joiner; +import com.google.common.collect.Maps; +import com.vividsolutions.jts.geom.Geometry; +import com.vividsolutions.jts.io.ParseException; +import com.vividsolutions.jts.io.WKTReader; + +public class MongoGeoTupleSet extends ExternalTupleSet { + + private Configuration conf; + private GeoIndexer geoIndexer; + private IndexingExpr filterInfo; + + + public MongoGeoTupleSet(IndexingExpr filterInfo, GeoIndexer geoIndexer) { + this.filterInfo = filterInfo; + this.geoIndexer = geoIndexer; + this.conf = geoIndexer.getConf(); + } + + @Override + public Set<String> getBindingNames() { + return filterInfo.getBindingNames(); + } + + public GeoTupleSet clone() { + return new GeoTupleSet(filterInfo, geoIndexer); + } + + @Override + public double cardinality() { + return 0.0; // No idea how the estimate cardinality here. + } + + + @Override + public String getSignature() { + return "(GeoTuple Projection) " + "variables: " + Joiner.on(", ").join(this.getBindingNames()).replaceAll("\\s+", " "); + } + + + + @Override + public boolean equals(Object other) { + if (other == this) { + return true; + } + if (!(other instanceof MongoGeoTupleSet)) { + return false; + } + MongoGeoTupleSet arg = (MongoGeoTupleSet) other; + return this.filterInfo.equals(arg.filterInfo); + } + + @Override + public int hashCode() { + int result = 17; + result = 31*result + filterInfo.hashCode(); + + return result; + } + + + + /** + * Returns an iterator over the result set of the contained IndexingExpr. + * <p> + * Should be thread-safe (concurrent invocation {@link OfflineIterable} this + * method can be expected with some query evaluators. + */ + @Override + public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(BindingSet bindings) + throws QueryEvaluationException { + + + URI funcURI = filterInfo.getFunction(); + SearchFunction searchFunction = (new MongoGeoSearchFunctionFactory(conf)).getSearchFunction(funcURI); + if(filterInfo.getArguments().length > 1) { + throw new IllegalArgumentException("Index functions do not support more than two arguments."); + } + + String queryText = filterInfo.getArguments()[0].stringValue(); + + return IteratorFactory.getIterator(filterInfo.getSpConstraint(), bindings, queryText, searchFunction); + } + + + + //returns appropriate search function for a given URI + //search functions used in GeoMesaGeoIndexer to access index + public class MongoGeoSearchFunctionFactory { + + Configuration conf; + + private final Map<URI, SearchFunction> SEARCH_FUNCTION_MAP = Maps.newHashMap(); + + public MongoGeoSearchFunctionFactory(Configuration conf) { + this.conf = conf; + } + + + /** + * Get a {@link GeoSearchFunction} for a given URI. + * + * @param searchFunction + * @return + */ + public SearchFunction getSearchFunction(final URI searchFunction) { + + SearchFunction geoFunc = null; + + try { + geoFunc = getSearchFunctionInternal(searchFunction); + } catch (QueryEvaluationException e) { + e.printStackTrace(); + } + + return geoFunc; + } + + private SearchFunction getSearchFunctionInternal(final URI searchFunction) throws QueryEvaluationException { + SearchFunction sf = SEARCH_FUNCTION_MAP.get(searchFunction); + + if (sf != null) { + return sf; + } else { + throw new QueryEvaluationException("Unknown Search Function: " + searchFunction.stringValue()); + } + } + + private final SearchFunction GEO_EQUALS = new SearchFunction() { + + @Override + public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText, + StatementContraints contraints) throws QueryEvaluationException { + try { + WKTReader reader = new WKTReader(); + Geometry geometry = reader.read(queryText); + CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin( + geometry, contraints); + return statements; + } catch (ParseException e) { + throw new QueryEvaluationException(e); + } + } + + @Override + public String toString() { + return "GEO_EQUALS"; + }; + }; + + private final SearchFunction GEO_DISJOINT = new SearchFunction() { + + @Override + public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText, + StatementContraints contraints) throws QueryEvaluationException { + try { + WKTReader reader = new WKTReader(); + Geometry geometry = reader.read(queryText); + CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin( + geometry, contraints); + return statements; + } catch (ParseException e) { + throw new QueryEvaluationException(e); + } + } + + @Override + public String toString() { + return "GEO_DISJOINT"; + }; + }; + + private final SearchFunction GEO_INTERSECTS = new SearchFunction() { + + @Override + public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText, + StatementContraints contraints) throws QueryEvaluationException { + try { + WKTReader reader = new WKTReader(); + Geometry geometry = reader.read(queryText); + CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin( + geometry, contraints); + return statements; + } catch (ParseException e) { + throw new QueryEvaluationException(e); + } + } + + @Override + public String toString() { + return "GEO_INTERSECTS"; + }; + }; + + private final SearchFunction GEO_TOUCHES = new SearchFunction() { + + @Override + public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText, + StatementContraints contraints) throws QueryEvaluationException { + try { + WKTReader reader = new WKTReader(); + Geometry geometry = reader.read(queryText); + CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin( + geometry, contraints); + return statements; + } catch (ParseException e) { + throw new QueryEvaluationException(e); + } + } + + @Override + public String toString() { + return "GEO_TOUCHES"; + }; + }; + + private final SearchFunction GEO_CONTAINS = new SearchFunction() { + + @Override + public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText, + StatementContraints contraints) throws QueryEvaluationException { + try { + WKTReader reader = new WKTReader(); + Geometry geometry = reader.read(queryText); + CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin( + geometry, contraints); + return statements; + } catch (ParseException e) { + throw new QueryEvaluationException(e); + } + } + + @Override + public String toString() { + return "GEO_CONTAINS"; + }; + }; + + private final SearchFunction GEO_OVERLAPS = new SearchFunction() { + + @Override + public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText, + StatementContraints contraints) throws QueryEvaluationException { + try { + WKTReader reader = new WKTReader(); + Geometry geometry = reader.read(queryText); + CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin( + geometry, contraints); + return statements; + } catch (ParseException e) { + throw new QueryEvaluationException(e); + } + } + + @Override + public String toString() { + return "GEO_OVERLAPS"; + }; + }; + + private final SearchFunction GEO_CROSSES = new SearchFunction() { + + @Override + public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText, + StatementContraints contraints) throws QueryEvaluationException { + try { + WKTReader reader = new WKTReader(); + Geometry geometry = reader.read(queryText); + CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin( + geometry, contraints); + return statements; + } catch (ParseException e) { + throw new QueryEvaluationException(e); + } + } + + @Override + public String toString() { + return "GEO_CROSSES"; + }; + }; + + private final SearchFunction GEO_WITHIN = new SearchFunction() { + + @Override + public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText, + StatementContraints contraints) throws QueryEvaluationException { + try { + WKTReader reader = new WKTReader(); + Geometry geometry = reader.read(queryText); + CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin( + geometry, contraints); + return statements; + } catch (ParseException e) { + throw new QueryEvaluationException(e); + } + } + + @Override + public String toString() { + return "GEO_WITHIN"; + }; + }; + + { + SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_EQUALS, GEO_EQUALS); + SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_DISJOINT, GEO_DISJOINT); + SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_INTERSECTS, GEO_INTERSECTS); + SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_TOUCHES, GEO_TOUCHES); + SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_CONTAINS, GEO_CONTAINS); + SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_OVERLAPS, GEO_OVERLAPS); + SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_CROSSES, GEO_CROSSES); + SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_WITHIN, GEO_WITHIN); + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/indexing/src/test/java/ValidIndexCombinationGeneratorTest.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/test/java/ValidIndexCombinationGeneratorTest.java b/extras/indexing/src/test/java/ValidIndexCombinationGeneratorTest.java new file mode 100644 index 0000000..1e295b4 --- /dev/null +++ b/extras/indexing/src/test/java/ValidIndexCombinationGeneratorTest.java @@ -0,0 +1,488 @@ +import static org.junit.Assert.*; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +import junit.framework.Assert; +import mvm.rya.indexing.IndexPlanValidator.IndexedExecutionPlanGenerator; +import mvm.rya.indexing.IndexPlanValidator.ValidIndexCombinationGenerator; +import mvm.rya.indexing.external.tupleSet.ExternalTupleSet; +import mvm.rya.indexing.external.tupleSet.SimpleExternalTupleSet; + +import org.junit.Test; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.algebra.Projection; +import org.openrdf.query.algebra.TupleExpr; +import org.openrdf.query.parser.ParsedQuery; +import org.openrdf.query.parser.sparql.SPARQLParser; + +import com.google.common.collect.Lists; + + +public class ValidIndexCombinationGeneratorTest { + + + + + + + @Test + public void singleIndex() { + String q1 = ""// + + "SELECT ?f ?m ?d " // + + "{" // + + " ?f a ?m ."// + + " ?m <http://www.w3.org/2000/01/rdf-schema#label> ?d ."// + + " ?d <uri:talksTo> ?f . "// + + " ?f <uri:hangOutWith> ?m ." // + + " ?m <uri:hangOutWith> ?d ." // + + " ?f <uri:associatesWith> ?m ." // + + " ?m <uri:associatesWith> ?d ." // + + "}";// + + + + + + + SPARQLParser parser = new SPARQLParser(); + ParsedQuery pq1 = null; + + + SimpleExternalTupleSet extTup1 = null; + + + + + + + try { + pq1 = parser.parseQuery(q1, null); + + + + extTup1 = new SimpleExternalTupleSet((Projection) pq1.getTupleExpr()); + + + } catch (MalformedQueryException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + List<ExternalTupleSet> indexList = Lists.newArrayList(); + indexList.add(extTup1); + + + ValidIndexCombinationGenerator vic = new ValidIndexCombinationGenerator(pq1.getTupleExpr()); + Iterator<List<ExternalTupleSet>> combos = vic.getValidIndexCombos(indexList); + int size = 0; + while(combos.hasNext()) { + combos.hasNext(); + size++; + combos.next(); + combos.hasNext(); + } + + Assert.assertTrue(!combos.hasNext()); + Assert.assertEquals(1,size); + + + } + + + + + + + @Test + public void medQueryEightOverlapIndex() { + String q1 = ""// + + "SELECT ?f ?m ?d " // + + "{" // + + " ?f a ?m ."// + + " ?m <http://www.w3.org/2000/01/rdf-schema#label> ?d ."// + + " ?d <uri:talksTo> ?f . "// + + " ?f <uri:hangOutWith> ?m ." // + + " ?m <uri:hangOutWith> ?d ." // + + " ?f <uri:associatesWith> ?m ." // + + " ?m <uri:associatesWith> ?d ." // + + "}";// + + + String q2 = ""// + + "SELECT ?t ?s ?u " // + + "{" // + + " ?s a ?t ."// + + " ?t <http://www.w3.org/2000/01/rdf-schema#label> ?u ."// + + " ?u <uri:talksTo> ?s . "// + + "}";// + + + String q3 = ""// + + "SELECT ?s ?t ?u " // + + "{" // + + " ?s <uri:hangOutWith> ?t ." // + + " ?t <uri:hangOutWith> ?u ." // + + "}";// + + String q4 = ""// + + "SELECT ?s ?t ?u " // + + "{" // + + " ?s <uri:associatesWith> ?t ." // + + " ?t <uri:associatesWith> ?u ." // + + "}";// + + + String q5 = ""// + + "SELECT ?t ?s ?u " // + + "{" // + + " ?s a ?t ."// + + " ?t <http://www.w3.org/2000/01/rdf-schema#label> ?u ."// + + " ?u <uri:talksTo> ?s . "// + + " ?s <uri:hangOutWith> ?t ." // + + " ?t <uri:hangOutWith> ?u ." // + + "}";// + + String q6 = ""// + + "SELECT ?s ?t ?u " // + + "{" // + + " ?s <uri:associatesWith> ?t ." // + + " ?t <uri:associatesWith> ?u ." // + + " ?s <uri:hangOutWith> ?t ." // + + " ?t <uri:hangOutWith> ?u ." // + + "}";// + + + String q7 = ""// + + "SELECT ?s ?t ?u " // + + "{" // + + " ?s <uri:associatesWith> ?t ." // + + " ?t <uri:associatesWith> ?u ." // + + " ?t <uri:hangOutWith> ?u ." // + + "}";// + + + + String q8 = ""// + + "SELECT ?t ?s ?u " // + + "{" // + + " ?s a ?t ."// + + " ?t <http://www.w3.org/2000/01/rdf-schema#label> ?u ."// + + " ?u <uri:talksTo> ?s . "// + + " ?s <uri:associatesWith> ?t ." // + + "}";// + + + String q9 = ""// + + "SELECT ?t ?s ?u " // + + "{" // + + " ?s a ?t ."// + + " ?t <http://www.w3.org/2000/01/rdf-schema#label> ?u ."// + + "}";// + + + + + + + + + + SPARQLParser parser = new SPARQLParser(); + ParsedQuery pq1 = null; + ParsedQuery pq2 = null; + ParsedQuery pq3 = null; + ParsedQuery pq4 = null; + ParsedQuery pq5 = null; + ParsedQuery pq6 = null; + ParsedQuery pq7 = null; + ParsedQuery pq8 = null; + ParsedQuery pq9 = null; + + SimpleExternalTupleSet extTup1 = null; + SimpleExternalTupleSet extTup2 = null; + SimpleExternalTupleSet extTup3 = null; + SimpleExternalTupleSet extTup4 = null; + SimpleExternalTupleSet extTup5 = null; + SimpleExternalTupleSet extTup6 = null; + SimpleExternalTupleSet extTup7 = null; + SimpleExternalTupleSet extTup8 = null; + + + + + + try { + pq1 = parser.parseQuery(q1, null); + pq2 = parser.parseQuery(q2, null); + pq3 = parser.parseQuery(q3, null); + pq4 = parser.parseQuery(q4, null); + pq5 = parser.parseQuery(q5, null); + pq6 = parser.parseQuery(q6, null); + pq7 = parser.parseQuery(q7, null); + pq8 = parser.parseQuery(q8, null); + pq9 = parser.parseQuery(q9, null); + + + extTup1 = new SimpleExternalTupleSet((Projection) pq2.getTupleExpr()); + extTup2 = new SimpleExternalTupleSet((Projection) pq3.getTupleExpr()); + extTup3 = new SimpleExternalTupleSet((Projection) pq4.getTupleExpr()); + extTup4 = new SimpleExternalTupleSet((Projection) pq5.getTupleExpr()); + extTup5 = new SimpleExternalTupleSet((Projection) pq6.getTupleExpr()); + extTup6 = new SimpleExternalTupleSet((Projection) pq7.getTupleExpr()); + extTup7 = new SimpleExternalTupleSet((Projection) pq8.getTupleExpr()); + extTup8 = new SimpleExternalTupleSet((Projection) pq9.getTupleExpr()); + + + } catch (MalformedQueryException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + List<ExternalTupleSet> indexList = Lists.newArrayList(); + indexList.add(extTup1); + indexList.add(extTup2); + indexList.add(extTup3); + indexList.add(extTup4); + indexList.add(extTup5); + indexList.add(extTup6); + indexList.add(extTup7); + indexList.add(extTup8); + + + ValidIndexCombinationGenerator vic = new ValidIndexCombinationGenerator(pq1.getTupleExpr()); + Iterator<List<ExternalTupleSet>> combos = vic.getValidIndexCombos(indexList); + int size = 0; + while(combos.hasNext()) { + combos.hasNext(); + size++; + combos.next(); + combos.hasNext(); + } + + Assert.assertTrue(!combos.hasNext()); + Assert.assertEquals(21,size); + + + } + + + + + + @Test + public void largeQuerySixteenIndexTest() { + + + String q1 = ""// + + "SELECT ?f ?m ?d ?e ?l ?c ?n ?o ?p ?a ?h ?r " // + + "{" // + + " ?f a ?m ."// + + " ?e a ?l ."// + + " ?n a ?o ."// + + " ?a a ?h ."// + + " ?m <http://www.w3.org/2000/01/rdf-schema#label> ?d ."// + + " ?l <http://www.w3.org/2000/01/rdf-schema#label> ?c ."// + + " ?o <http://www.w3.org/2000/01/rdf-schema#label> ?p ."// + + " ?h <http://www.w3.org/2000/01/rdf-schema#label> ?r ."// + + " ?d <uri:talksTo> ?f . "// + + " ?c <uri:talksTo> ?e . "// + + " ?p <uri:talksTo> ?n . "// + + " ?r <uri:talksTo> ?a . "// + + "}";// + + + String q2 = ""// + + "SELECT ?s ?t ?u " // + + "{" // + + " ?s a ?t ."// + + " ?t <http://www.w3.org/2000/01/rdf-schema#label> ?u ."// + + " ?u <uri:talksTo> ?s . "// + + "}";// + + + + String q3 = ""// + + "SELECT ?s ?t ?u ?d ?f ?g " // + + "{" // + + " ?s a ?t ."// + + " ?t <http://www.w3.org/2000/01/rdf-schema#label> ?u ."// + + " ?u <uri:talksTo> ?s . "// + + " ?d a ?f ."// + + " ?f <http://www.w3.org/2000/01/rdf-schema#label> ?g ."// + + " ?g <uri:talksTo> ?d . "// + + "}";// + + + + + SPARQLParser parser = new SPARQLParser(); + + ParsedQuery pq1 = null; + ParsedQuery pq2 = null; + ParsedQuery pq3 = null; + + + try { + pq1 = parser.parseQuery(q1, null); + pq2 = parser.parseQuery(q2, null); + pq3 = parser.parseQuery(q3, null); + + } catch (Exception e) { + e.printStackTrace(); + } + + SimpleExternalTupleSet extTup1 = new SimpleExternalTupleSet((Projection) pq2.getTupleExpr()); + SimpleExternalTupleSet extTup2 = new SimpleExternalTupleSet((Projection) pq3.getTupleExpr()); + + + List<ExternalTupleSet> list = new ArrayList<ExternalTupleSet>(); + + list.add(extTup2); + list.add(extTup1); + + + IndexedExecutionPlanGenerator iep = new IndexedExecutionPlanGenerator(pq1.getTupleExpr(), list); + List<ExternalTupleSet> indexSet = iep.getNormalizedIndices(); + + + Assert.assertEquals(16, indexSet.size()); + + ValidIndexCombinationGenerator vic = new ValidIndexCombinationGenerator(pq1.getTupleExpr()); + Iterator<List<ExternalTupleSet>> eSet = vic.getValidIndexCombos(Lists.newArrayList(indexSet)); + + int size = 0; + while(eSet.hasNext()) { + size++; + Assert.assertTrue(eSet.hasNext()); + eSet.next(); + } + + + Assert.assertTrue(!eSet.hasNext()); + Assert.assertEquals(75, size); + + } + + + + + + + @Test + public void largeQueryFourtyIndexTest() { + + + String q1 = ""// + + "SELECT ?f ?m ?d ?e ?l ?c ?n ?o ?p ?a ?h ?r " // + + "{" // + + " ?f a ?m ."// + + " ?e a ?l ."// + + " ?n a ?o ."// + + " ?a a ?h ."// + + " ?m <http://www.w3.org/2000/01/rdf-schema#label> ?d ."// + + " ?l <http://www.w3.org/2000/01/rdf-schema#label> ?c ."// + + " ?o <http://www.w3.org/2000/01/rdf-schema#label> ?p ."// + + " ?h <http://www.w3.org/2000/01/rdf-schema#label> ?r ."// + + " ?d <uri:talksTo> ?f . "// + + " ?c <uri:talksTo> ?e . "// + + " ?p <uri:talksTo> ?n . "// + + " ?r <uri:talksTo> ?a . "// + + "}";// + + + String q2 = ""// + + "SELECT ?s ?t ?u " // + + "{" // + + " ?s a ?t ."// + + " ?t <http://www.w3.org/2000/01/rdf-schema#label> ?u ."// + + " ?u <uri:talksTo> ?s . "// + + "}";// + + + + String q3 = ""// + + "SELECT ?s ?t ?u ?d ?f ?g " // + + "{" // + + " ?s a ?t ."// + + " ?t <http://www.w3.org/2000/01/rdf-schema#label> ?u ."// + + " ?u <uri:talksTo> ?s . "// + + " ?d a ?f ."// + + " ?f <http://www.w3.org/2000/01/rdf-schema#label> ?g ."// + + " ?g <uri:talksTo> ?d . "// + + "}";// + + + + String q4 = ""// + + "SELECT ?s ?t ?u ?d ?f ?g ?a ?b ?c" // + + "{" // + + " ?s a ?t ."// + + " ?t <http://www.w3.org/2000/01/rdf-schema#label> ?u ."// + + " ?u <uri:talksTo> ?s . "// + + " ?d a ?f ."// + + " ?f <http://www.w3.org/2000/01/rdf-schema#label> ?g ."// + + " ?g <uri:talksTo> ?d . "// + + " ?a a ?b ."// + + " ?b <http://www.w3.org/2000/01/rdf-schema#label> ?c ."// + + " ?c <uri:talksTo> ?a . "// + + "}";// + + + SPARQLParser parser = new SPARQLParser(); + + ParsedQuery pq1 = null; + ParsedQuery pq2 = null; + ParsedQuery pq3 = null; + ParsedQuery pq4 = null; + + + try { + pq1 = parser.parseQuery(q1, null); + pq2 = parser.parseQuery(q2, null); + pq3 = parser.parseQuery(q3, null); + pq4 = parser.parseQuery(q4, null); + + } catch (Exception e) { + e.printStackTrace(); + } + + SimpleExternalTupleSet extTup1 = new SimpleExternalTupleSet((Projection) pq2.getTupleExpr()); + SimpleExternalTupleSet extTup2 = new SimpleExternalTupleSet((Projection) pq3.getTupleExpr()); + SimpleExternalTupleSet extTup3 = new SimpleExternalTupleSet((Projection) pq4.getTupleExpr()); + + List<ExternalTupleSet> list = new ArrayList<ExternalTupleSet>(); + + list.add(extTup2); + list.add(extTup1); + list.add(extTup3); + + IndexedExecutionPlanGenerator iep = new IndexedExecutionPlanGenerator(pq1.getTupleExpr(), list); + List<ExternalTupleSet> indexSet = iep.getNormalizedIndices(); + Assert.assertEquals(40, indexSet.size()); + + ValidIndexCombinationGenerator vic = new ValidIndexCombinationGenerator(pq1.getTupleExpr()); + Iterator<List<ExternalTupleSet>> eSet = vic.getValidIndexCombos(Lists.newArrayList(indexSet)); + + int size = 0; + while(eSet.hasNext()) { + size++; + Assert.assertTrue(eSet.hasNext()); + eSet.next(); + } + + Assert.assertTrue(!eSet.hasNext()); + Assert.assertEquals(123, size); + } + + + + + + + + + +}
