http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityTupleSet.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityTupleSet.java b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityTupleSet.java new file mode 100644 index 0000000..3944a59 --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityTupleSet.java @@ -0,0 +1,263 @@ +package mvm.rya.indexing.accumulo.entity; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import info.aduna.iteration.CloseableIteration; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.accumulo.AccumuloRyaDAO; +import mvm.rya.api.RdfCloudTripleStoreConfiguration; +import mvm.rya.indexing.accumulo.ConfigUtils; +import mvm.rya.indexing.accumulo.entity.StarQuery.CardinalityStatementPattern; +import mvm.rya.joinselect.AccumuloSelectivityEvalDAO; +import mvm.rya.prospector.service.ProspectorServiceEvalStatsDAO; +import mvm.rya.rdftriplestore.RdfCloudTripleStore; +import mvm.rya.rdftriplestore.RdfCloudTripleStoreConnection; +import mvm.rya.rdftriplestore.evaluation.ExternalBatchingIterator; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.commons.io.IOUtils; +import org.openrdf.query.BindingSet; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.algebra.StatementPattern; +import org.openrdf.query.algebra.Var; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; +import org.openrdf.query.algebra.evaluation.impl.ExternalSet; +import org.openrdf.sail.SailException; + +import com.beust.jcommander.internal.Sets; +import com.google.common.base.Joiner; + +public class EntityTupleSet extends ExternalSet implements ExternalBatchingIterator { + + + private StarQuery starQuery; + private RdfCloudTripleStoreConfiguration conf; + private Set<String> variables; + private double cardinality = -1; + private StatementPattern minSp; + private double minCard; + private Connector accCon = null; + private boolean evalOptUsed = false; + + public EntityTupleSet() { + + } + + public EntityTupleSet(StarQuery sq, RdfCloudTripleStoreConfiguration conf) { + this.starQuery = sq; + this.conf = conf; + + variables = Sets.newHashSet(); + if(!starQuery.commonVarConstant()) { + variables.add(starQuery.getCommonVarName()); + } + variables.addAll(starQuery.getUnCommonVars()); + + init(); + + } + + public EntityTupleSet(StarQuery sq, RdfCloudTripleStoreConfiguration conf, boolean evalOptUsed) { + this(sq,conf); + this.evalOptUsed = evalOptUsed; + } + + private void init() { + + try { + accCon = ConfigUtils.getConnector(conf); + } catch (AccumuloException e) { + e.printStackTrace(); + } catch (AccumuloSecurityException e) { + e.printStackTrace(); + } + if (conf.isUseStats() && conf.isUseSelectivity()) { + + ProspectorServiceEvalStatsDAO evalDao = new ProspectorServiceEvalStatsDAO(accCon, conf); + evalDao.init(); + AccumuloSelectivityEvalDAO ase = new AccumuloSelectivityEvalDAO(conf, accCon); + ase.setRdfEvalDAO(evalDao); + ase.init(); + + cardinality = starQuery.getCardinality(ase); + CardinalityStatementPattern csp = starQuery.getMinCardSp(ase); + + minCard = csp.getCardinality(); + minSp = csp.getSp(); + } else { + // TODO come up with a better default if cardinality is not + // initialized + cardinality = minCard = 1; + minSp = starQuery.getNodes().get(0); + } + + } + + @Override + public Set<String> getBindingNames() { + return starQuery.getBindingNames(); + } + + @Override + public Set<String> getAssuredBindingNames() { + return starQuery.getAssuredBindingNames(); + } + + public Set<String> getVariables() { + return variables; + } + + + @Override + public String getSignature() { + return "(EntityCentric Projection) " + " common Var: " + starQuery.getCommonVarName() + " variables: " + Joiner.on(", ").join(variables).replaceAll("\\s+", " "); + } + + public StarQuery getStarQuery() { + return starQuery; + } + + public void setStarQuery(StarQuery sq) { + this.starQuery = sq; + } + + + @Override + public EntityTupleSet clone() { + StarQuery sq = new StarQuery(starQuery); + return new EntityTupleSet(sq, conf); + } + + + @Override + public double cardinality() { + return cardinality; + } + + + public double getMinSpCard() { + return minCard; + } + + + @Override + public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(BindingSet bindings) throws QueryEvaluationException { + + // if starQuery contains node with cardinality less than 1000 and node + // only has one variable, and number of SPs in starQuery is greater than 2, it is + // more efficient to first evaluate this node and then pass the bindings + // into the remainder of the star query to be evaluated + if (minCard < 1000 && starQuery.size() > 2 && numberOfSpVars(minSp) == 1 && !starQuery.commonVarConstant()) { + + try { + RdfCloudTripleStoreConnection conn = getRyaSailConnection(); + CloseableIteration<BindingSet, QueryEvaluationException> sol = (CloseableIteration<BindingSet, QueryEvaluationException>) conn + .evaluate(minSp, null, bindings, false); + + Set<BindingSet> bSet = Sets.newHashSet(); + while (sol.hasNext()) { + //TODO this is not optimal - should check if bindings variables intersect minSp variables + //creating the following QueryBindingSet is only necessary if no intersection occurs + QueryBindingSet bs = new QueryBindingSet(); + bs.addAll(sol.next()); + bs.addAll(bindings); + bSet.add(bs); + } + + List<StatementPattern> spList = starQuery.getNodes(); + spList.remove(minSp); + + StarQuery sq = new StarQuery(spList); + conn.close(); + + return new EntityTupleSet(sq, conf, true).evaluate(bSet); + + } catch (Exception e) { + throw new QueryEvaluationException(e); + } + } else { + this.evalOptUsed = true; + return this.evaluate(Collections.singleton(bindings)); + } + + } + + + private int numberOfSpVars(StatementPattern sp) { + List<Var> varList = sp.getVarList(); + int varCount = 0; + + for(int i = 0; i < 3; i++) { + if(!varList.get(i).isConstant()) { + varCount++; + } + } + + return varCount; + } + + + @Override + public CloseableIteration<BindingSet,QueryEvaluationException> evaluate(final Collection<BindingSet> bindingset) throws QueryEvaluationException { + + if(bindingset.size() < 2 && !this.evalOptUsed) { + BindingSet bs = new QueryBindingSet(); + if (bindingset.size() == 1) { + bs = bindingset.iterator().next(); + } + return this.evaluate(bs); + } + //TODO possibly refactor if bindingset.size() > 0 to take advantage of optimization in evaluate(BindingSet bindingset) + AccumuloDocIdIndexer adi = null; + try { + adi = new AccumuloDocIdIndexer(conf); + return adi.queryDocIndex(starQuery, bindingset); + } catch (Exception e) { + throw new QueryEvaluationException(e); + } finally { + IOUtils.closeQuietly(adi); + } + } + + + private RdfCloudTripleStoreConnection getRyaSailConnection() throws AccumuloException, + AccumuloSecurityException, SailException { + final RdfCloudTripleStore store = new RdfCloudTripleStore(); + AccumuloRyaDAO crdfdao = new AccumuloRyaDAO(); + crdfdao.setConnector(accCon); + AccumuloRdfConfiguration acc = new AccumuloRdfConfiguration(conf); + crdfdao.setConf(acc); + store.setRyaDAO(crdfdao); + store.initialize(); + + return (RdfCloudTripleStoreConnection) store.getConnection(); + } + + +}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/StarQuery.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/StarQuery.java b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/StarQuery.java new file mode 100644 index 0000000..b40beb6 --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/StarQuery.java @@ -0,0 +1,636 @@ +package mvm.rya.indexing.accumulo.entity; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import mvm.rya.accumulo.documentIndex.TextColumn; +import mvm.rya.api.domain.RyaType; +import mvm.rya.api.domain.RyaURI; +import mvm.rya.api.resolver.RdfToRyaConversions; +import mvm.rya.api.resolver.RyaContext; +import mvm.rya.api.resolver.RyaTypeResolverException; +import mvm.rya.joinselect.AccumuloSelectivityEvalDAO; + +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.hadoop.io.Text; +import org.openrdf.model.Value; +import org.openrdf.query.BindingSet; +import org.openrdf.query.algebra.StatementPattern; +import org.openrdf.query.algebra.Var; + +import com.beust.jcommander.internal.Maps; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.common.primitives.Bytes; + +public class StarQuery { + + private List<StatementPattern> nodes; + private TextColumn[] nodeColumnCond; + private String commonVarName; + private Var commonVar; + private Var context; + private String contextURI =""; + private Map<String,Integer> varPos = Maps.newHashMap(); + private boolean isCommonVarURI = false; + + + public StarQuery(List<StatementPattern> nodes) { + this.nodes = nodes; + if(nodes.size() == 0) { + throw new IllegalArgumentException("Nodes cannot be empty!"); + } + nodeColumnCond = new TextColumn[nodes.size()]; + Var tempContext = nodes.get(0).getContextVar(); + if(tempContext != null) { + context = tempContext.clone(); + } else { + context = new Var(); + } + try { + this.init(); + } catch (RyaTypeResolverException e) { + e.printStackTrace(); + } + } + + + public StarQuery(Set<StatementPattern> nodes) { + this(Lists.newArrayList(nodes)); + } + + public int size() { + return nodes.size(); + } + + public StarQuery(StarQuery other) { + this(other.nodes); + } + + + public List<StatementPattern> getNodes() { + return nodes; + } + + + public TextColumn[] getColumnCond() { + return nodeColumnCond; + } + + + public boolean isCommonVarURI() { + return isCommonVarURI; + } + + public String getCommonVarName() { + return commonVarName; + } + + public Var getCommonVar() { + return commonVar; + } + + public boolean commonVarHasValue() { + return commonVar.getValue() != null; + } + + public boolean commonVarConstant() { + return commonVar.isConstant(); + } + + public String getCommonVarValue() { + if(commonVarHasValue()) { + return commonVar.getValue().stringValue(); + } else { + return null; + } + } + + + public Set<String> getUnCommonVars() { + return varPos.keySet(); + } + + + public Map<String,Integer> getVarPos() { + return varPos; + } + + public boolean hasContext() { + return context.getValue() != null; + } + + public String getContextURI() { + return contextURI; + } + + + + + public Set<String> getBindingNames() { + + Set<String> bindingNames = Sets.newHashSet(); + + for(StatementPattern sp: nodes) { + + if(bindingNames.size() == 0) { + bindingNames = sp.getBindingNames(); + } else { + bindingNames = Sets.union(bindingNames, sp.getBindingNames()); + } + + } + + return bindingNames; + + } + + + + + public Set<String> getAssuredBindingNames() { + + Set<String> bindingNames = Sets.newHashSet(); + + for(StatementPattern sp: nodes) { + + if(bindingNames.size() == 0) { + bindingNames = sp.getAssuredBindingNames(); + } else { + bindingNames = Sets.union(bindingNames, sp.getAssuredBindingNames()); + } + + } + + return bindingNames; + + } + + + + + + + + public CardinalityStatementPattern getMinCardSp(AccumuloSelectivityEvalDAO ase) { + + StatementPattern minSp = null; + double cardinality = Double.MAX_VALUE; + double tempCard = -1; + + for (StatementPattern sp : nodes) { + + try { + tempCard = ase.getCardinality(ase.getConf(), sp); + + if (tempCard < cardinality) { + cardinality = tempCard; + minSp = sp; + } + } catch (TableNotFoundException e) { + e.printStackTrace(); + } + + + } + + return new CardinalityStatementPattern(minSp, cardinality) ; + } + + + + public class CardinalityStatementPattern { + + private StatementPattern sp; + private double cardinality; + + public CardinalityStatementPattern(StatementPattern sp, double cardinality) { + this.sp = sp; + this.cardinality = cardinality; + } + + public StatementPattern getSp() { + return sp; + } + + public double getCardinality() { + return cardinality; + } + + } + + + public double getCardinality( AccumuloSelectivityEvalDAO ase) { + + double cardinality = Double.MAX_VALUE; + double tempCard = -1; + + ase.setDenormalized(true); + + try { + + for (int i = 0; i < nodes.size(); i++) { + for (int j = i + 1; j < nodes.size(); j++) { + + tempCard = ase.getJoinSelect(ase.getConf(), nodes.get(i), nodes.get(j)); + + if (tempCard < cardinality) { + cardinality = tempCard; + } + + } + } + + } catch (Exception e) { + e.printStackTrace(); + } + + ase.setDenormalized(false); + + return cardinality/(nodes.size() + 1); + + } + + + + public static Set<String> getCommonVars(StarQuery query, BindingSet bs) { + + Set<String> starQueryVarNames = Sets.newHashSet(); + + if(bs == null || bs.size() == 0) { + return Sets.newHashSet(); + } + + Set<String> bindingNames = bs.getBindingNames(); + starQueryVarNames.addAll(query.getUnCommonVars()); + if(!query.commonVarConstant()) { + starQueryVarNames.add(query.getCommonVarName()); + } + + return Sets.intersection(bindingNames, starQueryVarNames); + + + } + + + + + + + public static StarQuery getConstrainedStarQuery(StarQuery query, BindingSet bs) { + + if(bs.size() == 0) { + return query; + } + + Set<String> bindingNames = bs.getBindingNames(); + Set<String> unCommonVarNames = query.getUnCommonVars(); + Set<String> intersectVar = Sets.intersection(bindingNames, unCommonVarNames); + + + if (!query.commonVarConstant()) { + + Value v = bs.getValue(query.getCommonVarName()); + + if (v != null) { + query.commonVar.setValue(v); + } + } + + for(String s: intersectVar) { + try { + query.nodeColumnCond[query.varPos.get(s)] = query.setValue(query.nodeColumnCond[query.varPos.get(s)], bs.getValue(s)); + } catch (RyaTypeResolverException e) { + e.printStackTrace(); + } + } + + return query; + } + + + private TextColumn setValue(TextColumn tc, Value v) throws RyaTypeResolverException { + + String cq = tc.getColumnQualifier().toString(); + String[] cqArray = cq.split("\u0000"); + + if (cqArray[0].equals("subject")) { + // RyaURI subjURI = (RyaURI) RdfToRyaConversions.convertValue(v); + tc.setColumnQualifier(new Text("subject" + "\u0000" + v.stringValue())); + tc.setIsPrefix(false); + } else if (cqArray[0].equals("object")) { + RyaType objType = RdfToRyaConversions.convertValue(v); + byte[][] b1 = RyaContext.getInstance().serializeType(objType); + byte[] b2 = Bytes.concat("object".getBytes(), + "\u0000".getBytes(), b1[0], b1[1]); + tc.setColumnQualifier(new Text(b2)); + tc.setIsPrefix(false); + } else { + throw new IllegalStateException("Invalid direction!"); + } + + return tc; + + } + + + + //assumes nodes forms valid star query with only one common variable + //assumes nodes and commonVar has been set + private TextColumn nodeToTextColumn(StatementPattern node, int i) throws RyaTypeResolverException { + + RyaContext rc = RyaContext.getInstance(); + + Var subjVar = node.getSubjectVar(); + Var predVar = node.getPredicateVar(); + Var objVar = node.getObjectVar(); + + RyaURI predURI = (RyaURI) RdfToRyaConversions.convertValue(node.getPredicateVar().getValue()); + + + //assumes StatementPattern contains at least on variable + if (subjVar.isConstant()) { + if (commonVarConstant()) { + varPos.put(objVar.getName(), i); + return new TextColumn(new Text(predURI.getData()), new Text("object")); + } else { + return new TextColumn(new Text(predURI.getData()), new Text("subject" + "\u0000" + + subjVar.getValue().stringValue())); + } + + } else if (objVar.isConstant()) { + + if (commonVarConstant()) { + varPos.put(subjVar.getName(), i); + return new TextColumn(new Text(predURI.getData()), new Text("subject")); + } else { + + isCommonVarURI = true; + RyaType objType = RdfToRyaConversions.convertValue(objVar.getValue()); + byte[][] b1 = rc.serializeType(objType); + + byte[] b2 = Bytes.concat("object".getBytes(), "\u0000".getBytes(), b1[0], b1[1]); + return new TextColumn(new Text(predURI.getData()), new Text(b2)); + } + + } else { + if (subjVar.getName().equals(commonVarName)) { + + isCommonVarURI = true; + varPos.put(objVar.getName(), i); + + TextColumn tc = new TextColumn(new Text(predURI.getData()), new Text("object")); + tc.setIsPrefix(true); + return tc; + + } else { + + varPos.put(subjVar.getName(), i); + + TextColumn tc = new TextColumn(new Text(predURI.getData()), new Text("subject")); + tc.setIsPrefix(true); + return tc; + + } + + + } + + + } + + + + + //called in constructor after nodes set + //assumes nodes and nodeColumnCond are same size + private void init() throws RyaTypeResolverException { + + + commonVar = this.getCommonVar(nodes); + if(!commonVar.isConstant()) { + commonVarName = commonVar.getName(); + } else { + commonVarName = commonVar.getName().substring(7); + } + + if(hasContext()) { + RyaURI ctxtURI = (RyaURI) RdfToRyaConversions.convertValue(context.getValue()); + contextURI = ctxtURI.getData(); + } + + for(int i = 0; i < nodes.size(); i++){ + nodeColumnCond[i] = nodeToTextColumn(nodes.get(i), i); + } + + } + + + + + + + + + // called after nodes set + // assumes nodes forms valid query with single, common variable + private Var getCommonVar(List<StatementPattern> nodes) { + + Set<Var> vars = null; + List<Var> tempVar; + Set<Var> tempSet; + + int i = 0; + for (StatementPattern sp : nodes) { + + if (vars == null) { + vars = Sets.newHashSet(); + vars.add(sp.getSubjectVar()); + vars.add(sp.getObjectVar()); + } else { + tempSet = Sets.newHashSet(); + tempSet.add(sp.getSubjectVar()); + tempSet.add(sp.getObjectVar()); + vars = Sets.intersection(vars, tempSet); + } + + } + + if (vars.size() == 1) { + return vars.iterator().next(); + } else if (vars.size() > 1) { + Var first = null; + + i = 0; + + for (Var v : vars) { + i++; + + if (i == 1) { + first = v; + } else { + if (v.isConstant()) { + return v; + } + } + } + + return first; + + } else { + throw new IllegalStateException("No common Var!"); + } + + } + + + //assumes bindings is not of size 0 + private static boolean isBindingsetValid(Set<String> bindings) { + + int varCount = 0; + + if (bindings.size() == 1) { + return true; + } else { + + + for (String s : bindings) { + if (!s.startsWith("-const-")) { + varCount++; + } + if (varCount > 1) { + return false; + } + } + + return true; + + } + + } + + + + + + public static boolean isValidStarQuery(Collection<StatementPattern> nodes) { + + Set<String> bindings = null; + boolean contextSet = false; + Var context = null; + + if(nodes.size() < 2) { + return false; + } + + for(StatementPattern sp: nodes) { + + Var tempContext = sp.getContextVar(); + Var predVar = sp.getPredicateVar(); + + //does not support variable context + if(tempContext != null && !tempContext.isConstant()) { + return false; + } + if(!contextSet) { + context = tempContext; + contextSet = true; + } else { + + if(context == null && tempContext != null) { + return false; + } else if (context != null && !context.equals(tempContext)) { + return false; + } + } + + if(!predVar.isConstant()) { + return false; + } + + if(bindings == null ) { + bindings = sp.getBindingNames(); + if(bindings.size() == 0) { + return false; + } + } else { + bindings = Sets.intersection(bindings, sp.getBindingNames()); + if(bindings.size() == 0) { + return false; + } + } + + } + + + return isBindingsetValid(bindings); + } + + + + + +// private static Set<String> getSpVariables(StatementPattern sp) { +// +// Set<String> variables = Sets.newHashSet(); +// List<Var> varList = sp.getVarList(); +// +// for(Var v: varList) { +// if(!v.isConstant()) { +// variables.add(v.getName()); +// } +// } +// +// return variables; +// +// } +// + + + + + + @Override + public String toString() { + + String s = "Term conditions: " + "\n"; + + for (TextColumn element : this.nodeColumnCond) { + s = s + element.toString() + "\n"; + } + + s = s + "Common Var: " + this.commonVar.toString() + "\n"; + s = s + "Context: " + this.contextURI; + + return s; + + } + + + + + + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexer.java b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexer.java new file mode 100644 index 0000000..a15ab45 --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexer.java @@ -0,0 +1,790 @@ +package mvm.rya.indexing.accumulo.freetext; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + + +import static mvm.rya.indexing.accumulo.freetext.query.ASTNodeUtils.getNodeIterator; + +import java.io.IOException; +import java.nio.charset.CharacterCodingException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.MultiTableBatchWriter; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.file.keyfunctor.ColumnFamilyFunctor; +import org.apache.accumulo.core.iterators.user.IntersectingIterator; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.Validate; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; +import org.openrdf.model.Literal; +import org.openrdf.model.Statement; +import org.openrdf.model.URI; +import org.openrdf.query.QueryEvaluationException; + +import com.google.common.base.Charsets; + +import info.aduna.iteration.CloseableIteration; +import mvm.rya.accumulo.experimental.AbstractAccumuloIndexer; +import mvm.rya.api.RdfCloudTripleStoreConfiguration; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.resolver.RyaToRdfConversions; +import mvm.rya.indexing.FreeTextIndexer; +import mvm.rya.indexing.Md5Hash; +import mvm.rya.indexing.StatementConstraints; +import mvm.rya.indexing.StatementSerializer; +import mvm.rya.indexing.accumulo.ConfigUtils; +import mvm.rya.indexing.accumulo.freetext.iterators.BooleanTreeIterator; +import mvm.rya.indexing.accumulo.freetext.query.ASTExpression; +import mvm.rya.indexing.accumulo.freetext.query.ASTNodeUtils; +import mvm.rya.indexing.accumulo.freetext.query.ASTSimpleNode; +import mvm.rya.indexing.accumulo.freetext.query.ASTTerm; +import mvm.rya.indexing.accumulo.freetext.query.ParseException; +import mvm.rya.indexing.accumulo.freetext.query.QueryParser; +import mvm.rya.indexing.accumulo.freetext.query.QueryParserTreeConstants; +import mvm.rya.indexing.accumulo.freetext.query.SimpleNode; +import mvm.rya.indexing.accumulo.freetext.query.TokenMgrError; + +/** + * The {@link AccumuloFreeTextIndexer} stores and queries "free text" data from statements into tables in Accumulo. Specifically, this class + * stores data into two different Accumulo Tables. This is the <b>document table</b> (default name: triplestore_text) and the <b>terms + * table</b> (default name: triplestore_terms). + * <p> + * The document table stores the document (i.e. a triple statement), document properties, and the terms within the document. This is the + * main table used for processing a text search by using document partitioned indexing. See {@link IntersectingIterator}. + * <p> + * For each document, the document table will store the following information: + * <P> + * + * <pre> + * Row (partition) | Column Family | Column Qualifier | Value + * ================+================+==================+========== + * shardID | d\x00 | documentHash | Document + * shardID | s\x00Subject | documentHash | (empty) + * shardID | p\x00Predicate | documentHash | (empty) + * shardID | o\x00Object | documentHash | (empty) + * shardID | c\x00Context | documentHash | (empty) + * shardID | t\x00token | documentHash | (empty) + * </pre> + * <p> + * Note: documentHash is a sha256 Hash of the Document's Content + * <p> + * The terms table is used for expanding wildcard search terms. For each token in the document table, the table will store the following + * information: + * + * <pre> + * Row (partition) | CF/CQ/Value + * ==================+============= + * l\x00token | (empty) + * r\x00Reversetoken | (empty) + * </pre> + * <p> + * There are two prefixes in the table, "token list" (keys with an "l" prefix) and "reverse token list" (keys with a "r" prefix). This table + * is uses the "token list" to expand foo* into terms like food, foot, and football. This table uses the "reverse token list" to expand *ar + * into car, bar, and far. + * <p> + * Example: Given these three statements as inputs: + * + * <pre> + * <uri:paul> rdfs:label "paul smith"@en <uri:graph1> + * <uri:steve> rdfs:label "steven anthony miller"@en <uri:graph1> + * <uri:steve> rdfs:label "steve miller"@en <uri:graph1> + * </pre> + * <p> + * Here's what the tables would look like: (Note: the hashes aren't real, the rows are not sorted, and the partition ids will vary.) + * <p> + * Triplestore_text + * + * <pre> + * Row (partition) | Column Family | Column Qualifier | Value + * ================+=================================+==================+========== + * 000000 | d\x00 | 08b3d233a | uri:graph1x00uri:paul\x00rdfs:label\x00"paul smith"@en + * 000000 | s\x00uri:paul | 08b3d233a | (empty) + * 000000 | p\x00rdfs:label | 08b3d233a | (empty) + * 000000 | o\x00"paul smith"@en | 08b3d233a | (empty) + * 000000 | c\x00uri:graph1 | 08b3d233a | (empty) + * 000000 | t\x00paul | 08b3d233a | (empty) + * 000000 | t\x00smith | 08b3d233a | (empty) + * + * 000000 | d\x00 | 3a575534b | uri:graph1x00uri:steve\x00rdfs:label\x00"steven anthony miller"@en + * 000000 | s\x00uri:steve | 3a575534b | (empty) + * 000000 | p\x00rdfs:label | 3a575534b | (empty) + * 000000 | o\x00"steven anthony miller"@en | 3a575534b | (empty) + * 000000 | c\x00uri:graph1 | 3a575534b | (empty) + * 000000 | t\x00steven | 3a575534b | (empty) + * 000000 | t\x00anthony | 3a575534b | (empty) + * 000000 | t\x00miller | 3a575534b | (empty) + * + * 000001 | d\x00 | 7bf670d06 | uri:graph1x00uri:steve\x00rdfs:label\x00"steve miller"@en + * 000001 | s\x00uri:steve | 7bf670d06 | (empty) + * 000001 | p\x00rdfs:label | 7bf670d06 | (empty) + * 000001 | o\x00"steve miller"@en | 7bf670d06 | (empty) + * 000001 | c\x00uri:graph1 | 7bf670d06 | (empty) + * 000001 | t\x00steve | 7bf670d06 | (empty) + * 000001 | t\x00miller | 7bf670d06 | (empty) + * </pre> + * <p> + * triplestore_terms + * <p> + * + * <pre> + * Row (partition) | CF/CQ/Value + * ==================+============= + * l\x00paul | (empty) + * l\x00smith | (empty) + * l\x00steven | (empty) + * l\x00anthony | (empty) + * l\x00miller | (empty) + * l\x00steve | (empty) + * r\x00luap | (empty) + * r\x00htims | (empty) + * r\x00nevets | (empty) + * r\x00ynohtna | (empty) + * r\x00rellim | (empty) + * r\x00evets | (empty) + * + * <pre> + */ +public class AccumuloFreeTextIndexer extends AbstractAccumuloIndexer implements FreeTextIndexer { + private static final String TABLE_SUFFIX_TERM = "freetext_term"; + + private static final String TABLE_SUFFFIX_DOC = "freetext"; + + private static final Logger logger = Logger.getLogger(AccumuloFreeTextIndexer.class); + + private static final boolean IS_TERM_TABLE_TOKEN_DELETION_ENABLED = true; + + private static final byte[] EMPTY_BYTES = new byte[] {}; + private static final Text EMPTY_TEXT = new Text(EMPTY_BYTES); + private static final Value EMPTY_VALUE = new Value(EMPTY_BYTES); + + private Tokenizer tokenizer; + + private BatchWriter docTableBw; + private BatchWriter termTableBw; + private MultiTableBatchWriter mtbw; + + private int queryTermLimit; + + private int docTableNumPartitions; + + private Set<URI> validPredicates; + + private Configuration conf; + + private boolean isInit = false; + + + private void initInternal() throws AccumuloException, AccumuloSecurityException, TableNotFoundException, + TableExistsException { + String doctable = getFreeTextDocTablename(conf); + String termtable = getFreeTextTermTablename(conf); + + docTableNumPartitions = ConfigUtils.getFreeTextDocNumPartitions(conf); + int termTableNumPartitions = ConfigUtils.getFreeTextTermNumPartitions(conf); + + TableOperations tableOps = ConfigUtils.getConnector(conf).tableOperations(); + + // Create term table partitions + boolean createdTermTable = ConfigUtils.createTableIfNotExists(conf, termtable); + if (createdTermTable && !ConfigUtils.useMockInstance(conf) && termTableNumPartitions > 0) { + TreeSet<Text> splits = new TreeSet<Text>(); + + // split on the "Term List" and "Reverse Term list" boundary + splits.add(new Text(ColumnPrefixes.getRevTermListColFam(""))); + + // Symmetrically split the "Term List" and "Reverse Term list" + int numSubpartitions = ((termTableNumPartitions - 1) / 2); + if (numSubpartitions > 0) { + int step = (26 / numSubpartitions); + for (int i = 0; i < numSubpartitions; i++) { + String nextChar = String.valueOf((char) ('a' + (step * i))); + splits.add(new Text(ColumnPrefixes.getTermListColFam(nextChar))); + splits.add(new Text(ColumnPrefixes.getRevTermListColFam(nextChar))); + } + } + tableOps.addSplits(termtable, splits); + } + + // Create document (text) table partitions + boolean createdDocTable = ConfigUtils.createTableIfNotExists(conf, doctable); + if (createdDocTable && !ConfigUtils.useMockInstance(conf)) { + TreeSet<Text> splits = new TreeSet<Text>(); + for (int i = 0; i < docTableNumPartitions; i++) { + splits.add(genPartition(i, docTableNumPartitions)); + } + tableOps.addSplits(doctable, splits); + + // Add a tablet level Bloom filter for the Column Family. + // This will allow us to quickly determine if a term is contained in a tablet. + tableOps.setProperty(doctable, "table.bloom.key.functor", ColumnFamilyFunctor.class.getCanonicalName()); + tableOps.setProperty(doctable, "table.bloom.enabled", Boolean.TRUE.toString()); + } + + mtbw = ConfigUtils.createMultitableBatchWriter(conf); + + docTableBw = mtbw.getBatchWriter(doctable); + termTableBw = mtbw.getBatchWriter(termtable); + + tokenizer = ConfigUtils.getFreeTextTokenizer(conf); + validPredicates = ConfigUtils.getFreeTextPredicates(conf); + + queryTermLimit = ConfigUtils.getFreeTextTermLimit(conf); + } + + + //initialization occurs in setConf because index is created using reflection + @Override + public void setConf(Configuration conf) { + this.conf = conf; + if (!isInit) { + try { + initInternal(); + isInit = true; + } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException | TableExistsException e) { + logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); + throw new RuntimeException(e); + } + } + } + + @Override + public Configuration getConf() { + return this.conf; + } + + + private void storeStatement(Statement statement) throws IOException { + // if the predicate list is empty, accept all predicates. + // Otherwise, make sure the predicate is on the "valid" list + boolean isValidPredicate = validPredicates.isEmpty() || validPredicates.contains(statement.getPredicate()); + + if (isValidPredicate && (statement.getObject() instanceof Literal)) { + + // Get the tokens + String text = statement.getObject().stringValue().toLowerCase(); + SortedSet<String> tokens = tokenizer.tokenize(text); + + if (!tokens.isEmpty()) { + // Get Document Data + String docContent = StatementSerializer.writeStatement(statement); + + String docId = Md5Hash.md5Base64(docContent); + + // Setup partition + Text partition = genPartition(docContent.hashCode(), docTableNumPartitions); + + Mutation docTableMut = new Mutation(partition); + List<Mutation> termTableMutations = new ArrayList<Mutation>(); + + Text docIdText = new Text(docId); + + // Store the Document Data + docTableMut.put(ColumnPrefixes.DOCS_CF_PREFIX, docIdText, new Value(docContent.getBytes(Charsets.UTF_8))); + + // index the statement parts + docTableMut.put(ColumnPrefixes.getSubjColFam(statement), docIdText, EMPTY_VALUE); + docTableMut.put(ColumnPrefixes.getPredColFam(statement), docIdText, EMPTY_VALUE); + docTableMut.put(ColumnPrefixes.getObjColFam(statement), docIdText, EMPTY_VALUE); + docTableMut.put(ColumnPrefixes.getContextColFam(statement), docIdText, EMPTY_VALUE); + + // index the statement terms + for (String token : tokens) { + // tie the token to the document + docTableMut.put(ColumnPrefixes.getTermColFam(token), docIdText, EMPTY_VALUE); + + // store the term in the term table (useful for wildcard searches) + termTableMutations.add(createEmptyPutMutation(ColumnPrefixes.getTermListColFam(token))); + termTableMutations.add(createEmptyPutMutation(ColumnPrefixes.getRevTermListColFam(token))); + } + + // write the mutations + try { + docTableBw.addMutation(docTableMut); + termTableBw.addMutations(termTableMutations); + } catch (MutationsRejectedException e) { + logger.error("error adding mutation", e); + throw new IOException(e); + } + + } + + } + } + + @Override + public void storeStatement(RyaStatement statement) throws IOException { + storeStatement(RyaToRdfConversions.convertStatement(statement)); + } + + private static Mutation createEmptyPutMutation(Text row) { + Mutation m = new Mutation(row); + m.put(EMPTY_TEXT, EMPTY_TEXT, EMPTY_VALUE); + return m; + } + + private static Mutation createEmptyPutDeleteMutation(Text row) { + Mutation m = new Mutation(row); + m.putDelete(EMPTY_TEXT, EMPTY_TEXT); + return m; + } + + private static Text genPartition(int partition, int numParitions) { + int length = Integer.toString(numParitions).length(); + return new Text(String.format("%0" + length + "d", Math.abs(partition % numParitions))); + } + + @Override + public Set<URI> getIndexablePredicates() { + return validPredicates; + } + + /** {@inheritDoc} */ + @Override + public void flush() throws IOException { + try { + mtbw.flush(); + } catch (MutationsRejectedException e) { + logger.error("error flushing the batch writer", e); + throw new IOException(e); + } + } + + /** {@inheritDoc} */ + @Override + public void close() throws IOException { + try { + mtbw.close(); + } catch (MutationsRejectedException e) { + logger.error("error closing the batch writer", e); + throw new IOException(e); + } + } + + private Set<String> unrollWildcard(String string, boolean reverse) throws IOException { + Scanner termTableScan = getScanner(getFreeTextTermTablename(conf)); + + Set<String> unrolledTerms = new HashSet<String>(); + + Text queryTerm; + if (reverse) { + String t = StringUtils.removeStart(string, "*").toLowerCase(); + queryTerm = ColumnPrefixes.getRevTermListColFam(t); + } else { + String t = StringUtils.removeEnd(string, "*").toLowerCase(); + queryTerm = ColumnPrefixes.getTermListColFam(t); + } + + // perform query and read results + termTableScan.setRange(Range.prefix(queryTerm)); + + for (Entry<Key, Value> e : termTableScan) { + String term = ColumnPrefixes.removePrefix(e.getKey().getRow()).toString(); + if (reverse) { + unrolledTerms.add(StringUtils.reverse(term)); + } else { + unrolledTerms.add(term); + } + } + + if (unrolledTerms.isEmpty()) { + // put in a placeholder term that will never be in the index. + unrolledTerms.add("\1\1\1"); + } + + return unrolledTerms; + } + + private void unrollWildcards(SimpleNode node) throws IOException { + if (node instanceof ASTExpression || node instanceof ASTSimpleNode) { + for (SimpleNode n : getNodeIterator(node)) { + unrollWildcards(n); + } + } else if (node instanceof ASTTerm) { + ASTTerm term = (ASTTerm) node; + boolean isWildTerm = term.getType().equals(ASTTerm.WILDTERM); + boolean isPreWildTerm = term.getType().equals(ASTTerm.PREFIXTERM); + if (isWildTerm || isPreWildTerm) { + Set<String> unrolledTerms = unrollWildcard(term.getTerm(), isPreWildTerm); + + // create a new expression + ASTExpression newExpression = new ASTExpression(QueryParserTreeConstants.JJTEXPRESSION); + newExpression.setType(ASTExpression.OR); + newExpression.setNotFlag(term.isNotFlag()); + + for (String unrolledTerm : unrolledTerms) { + ASTTerm t = new ASTTerm(QueryParserTreeConstants.JJTTERM); + t.setNotFlag(false); + t.setTerm(unrolledTerm); + t.setType(ASTTerm.TERM); + ASTNodeUtils.pushChild(newExpression, t); + } + + // replace "term" node with "expression" node in "term" node parent + SimpleNode parent = (SimpleNode) term.jjtGetParent(); + int index = ASTNodeUtils.getChildIndex(parent, term); + + Validate.isTrue(index >= 0, "child not found in parent"); + + parent.jjtAddChild(newExpression, index); + } + + } else { + throw new IllegalArgumentException("Node is of unknown type: " + node.getClass().getName()); + } + } + + private Scanner getScanner(String tablename) throws IOException { + try { + return ConfigUtils.createScanner(tablename, conf); + } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) { + logger.error("Error connecting to " + tablename); + throw new IOException(e); + } + } + + /** {@inheritDoc} */ + @Override + public CloseableIteration<Statement, QueryEvaluationException> queryText(String query, StatementConstraints contraints) + throws IOException { + Scanner docTableScan = getScanner(getFreeTextDocTablename(conf)); + + // test the query to see if it's parses correctly. + SimpleNode root = parseQuery(query); + + // unroll any wildcard nodes before it goes to the server + unrollWildcards(root); + + String unrolledQuery = ASTNodeUtils.serializeExpression(root); + + // Add S P O C constraints to query + StringBuilder constrainedQuery = new StringBuilder("(" + unrolledQuery + ")"); + + if (contraints.hasSubject()) { + constrainedQuery.append(" AND "); + constrainedQuery.append(ColumnPrefixes.getSubjColFam(contraints.getSubject().toString()).toString()); + } + if (contraints.hasContext()) { + constrainedQuery.append(" AND "); + constrainedQuery.append(ColumnPrefixes.getContextColFam(contraints.getContext().toString()).toString()); + } + if (contraints.hasPredicates()) { + constrainedQuery.append(" AND ("); + List<String> predicates = new ArrayList<String>(); + for (URI u : contraints.getPredicates()) { + predicates.add(ColumnPrefixes.getPredColFam(u.stringValue()).toString()); + } + constrainedQuery.append(StringUtils.join(predicates, " OR ")); + constrainedQuery.append(")"); + } + + // Verify that the query is a reasonable size + root = parseQuery(constrainedQuery.toString()); + int termCount = ASTNodeUtils.termCount(root); + + if (termCount > queryTermLimit) { + throw new IOException("Query contains too many terms. Term limit: " + queryTermLimit + ". Term Count: " + termCount); + } + + // perform query + docTableScan.clearScanIterators(); + docTableScan.clearColumns(); + + int iteratorPriority = 20; + String iteratorName = "booleanTree"; + IteratorSetting ii = new IteratorSetting(iteratorPriority, iteratorName, BooleanTreeIterator.class); + BooleanTreeIterator.setQuery(ii, constrainedQuery.toString()); + docTableScan.addScanIterator(ii); + docTableScan.setRange(new Range()); + + return getIteratorWrapper(docTableScan); + } + + private static CloseableIteration<Statement, QueryEvaluationException> getIteratorWrapper(final Scanner s) { + + final Iterator<Entry<Key, Value>> i = s.iterator(); + + return new CloseableIteration<Statement, QueryEvaluationException>() { + @Override + public boolean hasNext() { + return i.hasNext(); + } + + @Override + public Statement next() throws QueryEvaluationException { + Entry<Key, Value> entry = i.next(); + Value v = entry.getValue(); + try { + String dataString = Text.decode(v.get(), 0, v.getSize()); + Statement s = StatementSerializer.readStatement(dataString); + return s; + } catch (CharacterCodingException e) { + logger.error("Error decoding value", e); + throw new QueryEvaluationException(e); + } catch (IOException e) { + logger.error("Error deserializing statement", e); + throw new QueryEvaluationException(e); + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Remove not implemented"); + } + + @Override + public void close() throws QueryEvaluationException { + if (s != null) { + s.close(); + } + } + }; + } + + /** + * Simple adapter that parses the query using {@link QueryParser}. Note: any checked exceptions thrown by {@link QueryParser} are + * re-thrown as {@link IOException}s. + * + * @param query + * @return + * @throws IOException + */ + private static SimpleNode parseQuery(String query) throws IOException { + SimpleNode root = null; + try { + root = QueryParser.parse(query); + } catch (ParseException e) { + logger.error("Parser Exception on Client Side. Query: " + query, e); + throw new IOException(e); + } catch (TokenMgrError e) { + logger.error("Token Manager Exception on Client Side. Query: " + query, e); + throw new IOException(e); + } + return root; + } + + /** + * Get Free Text Document index table's name + * Use the two table version of this below. This one is required by base class. + */ + @Override + public String getTableName() { + return getFreeTextDocTablename(conf); + } + + /** + * Get all the tables used by this index. + * @param conf configuration map + * @return an unmodifiable list of all the table names. + */ + public static List<String> getTableNames(Configuration conf) { + return Collections.unmodifiableList( Arrays.asList( + getFreeTextDocTablename(conf), + getFreeTextTermTablename(conf) )); + } + + /** + * Get the Document index's table name. + * @param conf + * @return the Free Text Document index table's name + */ + public static String getFreeTextDocTablename(Configuration conf) { + return mvm.rya.indexing.accumulo.ConfigUtils.getTablePrefix(conf) + TABLE_SUFFFIX_DOC; + } + + /** + * Get the Term index's table name. + * @param conf + * @return the Free Text Term index table's name + */ + public static String getFreeTextTermTablename(Configuration conf) { + return mvm.rya.indexing.accumulo.ConfigUtils.getTablePrefix(conf) + TABLE_SUFFIX_TERM; + } + + private void deleteStatement(Statement statement) throws IOException { + // if the predicate list is empty, accept all predicates. + // Otherwise, make sure the predicate is on the "valid" list + boolean isValidPredicate = validPredicates.isEmpty() || validPredicates.contains(statement.getPredicate()); + + if (isValidPredicate && (statement.getObject() instanceof Literal)) { + + // Get the tokens + String text = statement.getObject().stringValue().toLowerCase(); + SortedSet<String> tokens = tokenizer.tokenize(text); + + if (!tokens.isEmpty()) { + // Get Document Data + String docContent = StatementSerializer.writeStatement(statement); + + String docId = Md5Hash.md5Base64(docContent); + + // Setup partition + Text partition = genPartition(docContent.hashCode(), docTableNumPartitions); + + Mutation docTableMut = new Mutation(partition); + List<Mutation> termTableMutations = new ArrayList<Mutation>(); + + Text docIdText = new Text(docId); + + // Delete the Document Data + docTableMut.putDelete(ColumnPrefixes.DOCS_CF_PREFIX, docIdText); + + // Delete the statement parts in index + docTableMut.putDelete(ColumnPrefixes.getSubjColFam(statement), docIdText); + docTableMut.putDelete(ColumnPrefixes.getPredColFam(statement), docIdText); + docTableMut.putDelete(ColumnPrefixes.getObjColFam(statement), docIdText); + docTableMut.putDelete(ColumnPrefixes.getContextColFam(statement), docIdText); + + + // Delete the statement terms in index + for (String token : tokens) { + if (IS_TERM_TABLE_TOKEN_DELETION_ENABLED) { + int rowId = Integer.parseInt(partition.toString()); + boolean doesTermExistInOtherDocs = doesTermExistInOtherDocs(token, rowId, docIdText); + // Only delete the term from the term table if it doesn't appear in other docs + if (!doesTermExistInOtherDocs) { + // Delete the term in the term table + termTableMutations.add(createEmptyPutDeleteMutation(ColumnPrefixes.getTermListColFam(token))); + termTableMutations.add(createEmptyPutDeleteMutation(ColumnPrefixes.getRevTermListColFam(token))); + } + } + + // Un-tie the token to the document + docTableMut.putDelete(ColumnPrefixes.getTermColFam(token), docIdText); + } + + // write the mutations + try { + docTableBw.addMutation(docTableMut); + termTableBw.addMutations(termTableMutations); + } catch (MutationsRejectedException e) { + logger.error("error adding mutation", e); + throw new IOException(e); + } + + } + } + } + + @Override + public void deleteStatement(RyaStatement statement) throws IOException { + deleteStatement(RyaToRdfConversions.convertStatement(statement)); + } + + /** + * Checks to see if the provided term appears in other documents. + * @param term the term to search for. + * @param currentDocId the current document ID that the search term exists in. + * @return {@code true} if the term was found in other documents. {@code false} otherwise. + */ + private boolean doesTermExistInOtherDocs(String term, int currentDocId, Text docIdText) { + try { + String freeTextDocTableName = getFreeTextDocTablename(conf); + Scanner scanner = getScanner(freeTextDocTableName); + + String t = StringUtils.removeEnd(term, "*").toLowerCase(); + Text queryTerm = ColumnPrefixes.getTermColFam(t); + + // perform query and read results + scanner.fetchColumnFamily(queryTerm); + + for (Entry<Key, Value> entry : scanner) { + Key key = entry.getKey(); + Text row = key.getRow(); + int rowId = Integer.parseInt(row.toString()); + // We only want to check other documents from the one we're deleting + if (rowId != currentDocId) { + Text columnFamily = key.getColumnFamily(); + String columnFamilyValue = columnFamily.toString(); + // Check that the value has the term prefix + if (columnFamilyValue.startsWith(ColumnPrefixes.TERM_CF_PREFIX.toString())) { + Text text = ColumnPrefixes.removePrefix(columnFamily); + String value = text.toString(); + if (value.equals(term)) { + return true; + } + } + } + } + } catch (IOException e) { + logger.error("Error searching for the existance of the term in other documents", e); + } + return false; + } + + + @Override + public void init() { + // TODO Auto-generated method stub + + } + + + @Override + public void setConnector(Connector connector) { + // TODO Auto-generated method stub + + } + + + @Override + public void destroy() { + // TODO Auto-generated method stub + + } + + + @Override + public void purge(RdfCloudTripleStoreConfiguration configuration) { + // TODO Auto-generated method stub + + } + + + @Override + public void dropAndDestroy() { + // TODO Auto-generated method stub + + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/freetext/ColumnPrefixes.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/freetext/ColumnPrefixes.java b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/freetext/ColumnPrefixes.java new file mode 100644 index 0000000..b33206b --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/freetext/ColumnPrefixes.java @@ -0,0 +1,120 @@ +package mvm.rya.indexing.accumulo.freetext; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + + +import java.nio.ByteBuffer; +import java.nio.charset.CharacterCodingException; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.io.Text; +import org.openrdf.model.Statement; + +import mvm.rya.indexing.StatementSerializer; + +/** + * Row ID: shardId + * <p> + * CF: CF Prefix + Term + */ +public class ColumnPrefixes { + public static final Text DOCS_CF_PREFIX = new Text("d\0"); + public static final Text TERM_CF_PREFIX = new Text("t\0"); + public static final Text TERM_LIST_CF_PREFIX = new Text("l\0"); + public static final Text REVERSE_TERM_LIST_CF_PREFIX = new Text("r\0"); + + public static final Text SUBJECT_CF_PREFIX = new Text("s\0"); + public static final Text PREDICATE_CF_PREFIX = new Text("p\0"); + public static final Text OBJECT_CF_PREFIX = new Text("o\0"); + public static final Text CONTEXT_CF_PREFIX = new Text("c\0"); + + private static Text concat(Text prefix, String str) { + Text temp = new Text(prefix); + + try { + ByteBuffer buffer = Text.encode(str, false); + temp.append(buffer.array(), 0, buffer.limit()); + } catch (CharacterCodingException cce) { + throw new IllegalArgumentException(cce); + } + + return temp; + } + + public static Text getTermColFam(String term) { + return concat(TERM_CF_PREFIX, term); + } + + public static Text getTermListColFam(String term) { + return concat(TERM_LIST_CF_PREFIX, term); + } + + public static Text getRevTermListColFam(String term) { + return concat(REVERSE_TERM_LIST_CF_PREFIX, StringUtils.reverse(term)); + } + + public static Text getDocColFam(String term) { + return concat(DOCS_CF_PREFIX, term); + } + + public static Text getSubjColFam(String term) { + return concat(SUBJECT_CF_PREFIX, term); + } + + public static Text getSubjColFam(Statement statement) { + String subj = StatementSerializer.writeSubject(statement); + return getSubjColFam(subj); + } + + public static Text getPredColFam(String term) { + return concat(PREDICATE_CF_PREFIX, term); + } + + public static Text getPredColFam(Statement statement) { + String pred = StatementSerializer.writePredicate(statement); + return getPredColFam(pred); + } + + public static Text getObjColFam(String term) { + return concat(OBJECT_CF_PREFIX, term); + } + + public static Text getObjColFam(Statement statement) { + String obj = StatementSerializer.writeObject(statement); + return getObjColFam(obj); + } + + public static Text getContextColFam(String term) { + return concat(CONTEXT_CF_PREFIX, term); + } + + public static Text getContextColFam(Statement statement) { + String cont = StatementSerializer.writeContext(statement); + return getContextColFam(cont); + } + + public static Text removePrefix(Text termWithPrefix) { + Text temp = new Text(); + temp.set(termWithPrefix.getBytes(), 2, termWithPrefix.getLength() - 2); + return temp; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/freetext/FreeTextTupleSet.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/freetext/FreeTextTupleSet.java b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/freetext/FreeTextTupleSet.java new file mode 100644 index 0000000..cfb0f38 --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/freetext/FreeTextTupleSet.java @@ -0,0 +1,160 @@ +package mvm.rya.indexing.accumulo.freetext; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import info.aduna.iteration.CloseableIteration; + +import java.io.IOException; +import java.util.Set; + +import mvm.rya.indexing.FreeTextIndexer; +import mvm.rya.indexing.IndexingExpr; +import mvm.rya.indexing.IteratorFactory; +import mvm.rya.indexing.SearchFunction; +import mvm.rya.indexing.StatementConstraints; +import mvm.rya.indexing.external.tupleSet.ExternalTupleSet; + +import org.apache.hadoop.conf.Configuration; +import org.openrdf.model.Statement; +import org.openrdf.model.URI; +import org.openrdf.query.BindingSet; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.algebra.QueryModelVisitor; + +import com.google.common.base.Joiner; + + +//Indexing Node for freetext expressions to be inserted into execution plan +//to delegate freetext portion of query to free text index +public class FreeTextTupleSet extends ExternalTupleSet { + + private Configuration conf; + private FreeTextIndexer freeTextIndexer; + private IndexingExpr filterInfo; + + + public FreeTextTupleSet(IndexingExpr filterInfo, FreeTextIndexer freeTextIndexer) { + this.filterInfo = filterInfo; + this.freeTextIndexer = freeTextIndexer; + this.conf = freeTextIndexer.getConf(); + } + + /** + * {@inheritDoc} + */ + @Override + public Set<String> getBindingNames() { + return filterInfo.getBindingNames(); + } + + /** + * {@inheritDoc} + * <p> + * Note that we need a deep copy for everything that (during optimizations) + * can be altered via {@link #visitChildren(QueryModelVisitor)} + */ + public FreeTextTupleSet clone() { + return new FreeTextTupleSet(filterInfo, freeTextIndexer); + } + + @Override + public double cardinality() { + return 0.0; // No idea how the estimate cardinality here. + } + + + + + @Override + public String getSignature() { + + return "(FreeTextTuple Projection) " + "variables: " + Joiner.on(", ").join(this.getBindingNames()).replaceAll("\\s+", " "); + } + + + + @Override + public boolean equals(Object other) { + if (other == this) { + return true; + } + if (!(other instanceof FreeTextTupleSet)) { + return false; + } + + FreeTextTupleSet arg = (FreeTextTupleSet) other; + return this.filterInfo.equals(arg.filterInfo); + } + + + @Override + public int hashCode() { + int result = 17; + result = 31*result + filterInfo.hashCode(); + + return result; + } + + + + /** + * Returns an iterator over the result set of the contained {@link IndexExpr}. + * <p> + * Should be thread-safe (concurrent invocation {@link OfflineIterable} this + * method can be expected with some query evaluators. + */ + @Override + public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(BindingSet bindings) + throws QueryEvaluationException { + + + URI funcURI = filterInfo.getFunction(); + + SearchFunction searchFunction = new SearchFunction() { + + @Override + public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText, + StatementConstraints contraints) throws QueryEvaluationException { + try { + CloseableIteration<Statement, QueryEvaluationException> statements = freeTextIndexer.queryText( + queryText, contraints); + return statements; + } catch (IOException e) { + throw new QueryEvaluationException(e); + } + } + + @Override + public String toString() { + return "TEXT"; + }; + }; + + if (filterInfo.getArguments().length > 1) { + throw new IllegalArgumentException("Index functions do not support more than two arguments."); + } + + String queryText = filterInfo.getArguments()[0].stringValue(); + + return IteratorFactory.getIterator(filterInfo.getSpConstraint(), bindings, queryText, searchFunction); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/freetext/LuceneTokenizer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/freetext/LuceneTokenizer.java b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/freetext/LuceneTokenizer.java new file mode 100644 index 0000000..abda04a --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/freetext/LuceneTokenizer.java @@ -0,0 +1,57 @@ +package mvm.rya.indexing.accumulo.freetext; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + + +import java.io.IOException; +import java.io.StringReader; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.analysis.TokenStream; +import org.apache.lucene.analysis.standard.StandardAnalyzer; +import org.apache.lucene.analysis.tokenattributes.CharTermAttribute; +import org.apache.lucene.util.Version; + +/** + * A {@link Tokenizer} that delegates to Lucene functions + */ +public class LuceneTokenizer implements Tokenizer { + private static final Analyzer analyzer = new StandardAnalyzer(Version.LUCENE_36); + + @Override + public SortedSet<String> tokenize(String string) { + SortedSet<String> set = new TreeSet<String>(); + try { + TokenStream stream = analyzer.tokenStream(null, new StringReader(string)); + stream.reset(); + while (stream.incrementToken()) { + set.add(stream.getAttribute(CharTermAttribute.class).toString()); + } + } catch (IOException e) { + // not thrown b/c we're using a string reader... + throw new RuntimeException(e); + } + + return set; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/freetext/SimpleTokenizer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/freetext/SimpleTokenizer.java b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/freetext/SimpleTokenizer.java new file mode 100644 index 0000000..e98e676 --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/freetext/SimpleTokenizer.java @@ -0,0 +1,43 @@ +package mvm.rya.indexing.accumulo.freetext; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + + +import java.util.SortedSet; +import java.util.TreeSet; + +/** + * A {@link Tokenizer} that splits on whitespace. + */ +public class SimpleTokenizer implements Tokenizer { + + @Override + public SortedSet<String> tokenize(String sting) { + SortedSet<String> set = new TreeSet<String>(); + for (String token : sting.split("\\s+")) { + String t = token.trim().toLowerCase(); + if (!t.isEmpty()) { + set.add(t); + } + } + return set; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/freetext/Tokenizer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/freetext/Tokenizer.java b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/freetext/Tokenizer.java new file mode 100644 index 0000000..24b40cd --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/freetext/Tokenizer.java @@ -0,0 +1,31 @@ +package mvm.rya.indexing.accumulo.freetext; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + + +import java.util.SortedSet; + +/** + * A utility that splits a string into tokens. + */ +public interface Tokenizer { + public SortedSet<String> tokenize(String sting); +}