http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/AccumuloDocIdIndexer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/AccumuloDocIdIndexer.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/AccumuloDocIdIndexer.java deleted file mode 100644 index feb894f..0000000 --- a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/AccumuloDocIdIndexer.java +++ /dev/null @@ -1,450 +0,0 @@ -package mvm.rya.indexing.accumulo.entity; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -import static mvm.rya.api.RdfCloudTripleStoreConstants.DELIM_BYTE; -import static mvm.rya.api.RdfCloudTripleStoreConstants.TYPE_DELIM_BYTE; -import info.aduna.iteration.CloseableIteration; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Map.Entry; -import java.util.NoSuchElementException; -import java.util.Set; - -import mvm.rya.accumulo.AccumuloRdfConfiguration; -import mvm.rya.accumulo.documentIndex.DocIndexIteratorUtil; -import mvm.rya.accumulo.documentIndex.DocumentIndexIntersectingIterator; -import mvm.rya.api.RdfCloudTripleStoreConfiguration; -import mvm.rya.api.domain.RyaURI; -import mvm.rya.api.resolver.RyaContext; -import mvm.rya.api.resolver.RyaToRdfConversions; -import mvm.rya.api.resolver.RyaTypeResolverException; -import mvm.rya.indexing.DocIdIndexer; -import mvm.rya.indexing.accumulo.ConfigUtils; - -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.BatchScanner; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Text; -import org.openrdf.query.BindingSet; -import org.openrdf.query.MalformedQueryException; -import org.openrdf.query.QueryEvaluationException; -import org.openrdf.query.algebra.StatementPattern; -import org.openrdf.query.algebra.TupleExpr; -import org.openrdf.query.algebra.evaluation.QueryBindingSet; -import org.openrdf.query.algebra.helpers.StatementPatternCollector; -import org.openrdf.query.parser.ParsedQuery; -import org.openrdf.query.parser.sparql.SPARQLParser; - -import com.google.common.base.Preconditions; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Sets; -import com.google.common.primitives.Bytes; - -public class AccumuloDocIdIndexer implements DocIdIndexer { - - - - private BatchScanner bs; - private AccumuloRdfConfiguration conf; - - public AccumuloDocIdIndexer(RdfCloudTripleStoreConfiguration conf) throws AccumuloException, AccumuloSecurityException { - Preconditions.checkArgument(conf instanceof RdfCloudTripleStoreConfiguration, "conf must be isntance of RdfCloudTripleStoreConfiguration"); - this.conf = (AccumuloRdfConfiguration) conf; - //Connector conn = ConfigUtils.getConnector(conf); - } - - - - - public CloseableIteration<BindingSet, QueryEvaluationException> queryDocIndex(String sparqlQuery, - Collection<BindingSet> constraints) throws TableNotFoundException, QueryEvaluationException { - - SPARQLParser parser = new SPARQLParser(); - ParsedQuery pq1 = null; - try { - pq1 = parser.parseQuery(sparqlQuery, null); - } catch (MalformedQueryException e) { - e.printStackTrace(); - } - - TupleExpr te1 = pq1.getTupleExpr(); - List<StatementPattern> spList1 = StatementPatternCollector.process(te1); - - if(StarQuery.isValidStarQuery(spList1)) { - StarQuery sq1 = new StarQuery(spList1); - return queryDocIndex(sq1, constraints); - } else { - throw new IllegalArgumentException("Invalid star query!"); - } - - } - - - - - @Override - public CloseableIteration<BindingSet, QueryEvaluationException> queryDocIndex(StarQuery query, - Collection<BindingSet> constraints) throws TableNotFoundException, QueryEvaluationException { - - final StarQuery starQ = query; - final Iterator<BindingSet> bs = constraints.iterator(); - final Iterator<BindingSet> bs2 = constraints.iterator(); - final Set<String> unCommonVarNames; - final Set<String> commonVarNames; - if (bs2.hasNext()) { - BindingSet currBs = bs2.next(); - commonVarNames = StarQuery.getCommonVars(query, currBs); - unCommonVarNames = Sets.difference(currBs.getBindingNames(), commonVarNames); - } else { - commonVarNames = Sets.newHashSet(); - unCommonVarNames = Sets.newHashSet(); - } - - if( commonVarNames.size() == 1 && !query.commonVarConstant() && commonVarNames.contains(query.getCommonVarName())) { - - final HashMultimap<String, BindingSet> map = HashMultimap.create(); - final String commonVar = starQ.getCommonVarName(); - final Iterator<Entry<Key, Value>> intersections; - final BatchScanner scan; - Set<Range> ranges = Sets.newHashSet(); - - while(bs.hasNext()) { - - BindingSet currentBs = bs.next(); - - if(currentBs.getBinding(commonVar) == null) { - continue; - } - - String row = currentBs.getBinding(commonVar).getValue().stringValue(); - ranges.add(new Range(row)); - map.put(row, currentBs); - - } - scan = runQuery(starQ, ranges); - intersections = scan.iterator(); - - - return new CloseableIteration<BindingSet, QueryEvaluationException>() { - - - private QueryBindingSet currentSolutionBs = null; - private boolean hasNextCalled = false; - private boolean isEmpty = false; - private Iterator<BindingSet> inputSet = (new ArrayList<BindingSet>()).iterator(); - private BindingSet currentBs; - private Key key; - - - - @Override - public boolean hasNext() throws QueryEvaluationException { - if (!hasNextCalled && !isEmpty) { - while (inputSet.hasNext() || intersections.hasNext()) { - if (!inputSet.hasNext()) { - key = intersections.next().getKey(); - inputSet = map.get(key.getRow().toString()).iterator(); - } - currentBs = inputSet.next(); - currentSolutionBs = deserializeKey(key, starQ, currentBs, unCommonVarNames); - - if (currentSolutionBs.size() == unCommonVarNames.size() + starQ.getUnCommonVars().size() +1) { - hasNextCalled = true; - return true; - } - - } - - isEmpty = true; - return false; - - } else if (isEmpty) { - return false; - } else { - return true; - } - - } - - - @Override - public BindingSet next() throws QueryEvaluationException { - - if (hasNextCalled) { - hasNextCalled = false; - } else if (isEmpty) { - throw new NoSuchElementException(); - } else { - if (this.hasNext()) { - hasNextCalled = false; - } else { - throw new NoSuchElementException(); - } - } - - return currentSolutionBs; - } - - @Override - public void remove() throws QueryEvaluationException { - throw new UnsupportedOperationException(); - } - - @Override - public void close() throws QueryEvaluationException { - scan.close(); - } - - }; - - - } else { - - return new CloseableIteration<BindingSet, QueryEvaluationException>() { - - @Override - public void remove() throws QueryEvaluationException { - throw new UnsupportedOperationException(); - } - - private Iterator<Entry<Key, Value>> intersections = null; - private QueryBindingSet currentSolutionBs = null; - private boolean hasNextCalled = false; - private boolean isEmpty = false; - private boolean init = false; - private BindingSet currentBs; - private StarQuery sq = new StarQuery(starQ); - private Set<Range> emptyRangeSet = Sets.newHashSet(); - private BatchScanner scan; - - @Override - public BindingSet next() throws QueryEvaluationException { - if (hasNextCalled) { - hasNextCalled = false; - } else if (isEmpty) { - throw new NoSuchElementException(); - } else { - if (this.hasNext()) { - hasNextCalled = false; - } else { - throw new NoSuchElementException(); - } - } - return currentSolutionBs; - } - - @Override - public boolean hasNext() throws QueryEvaluationException { - - if (!init) { - if (intersections == null && bs.hasNext()) { - currentBs = bs.next(); - sq = StarQuery.getConstrainedStarQuery(sq, currentBs); - scan = runQuery(sq,emptyRangeSet); - intersections = scan.iterator(); - // binding set empty - } else if (intersections == null && !bs.hasNext()) { - currentBs = new QueryBindingSet(); - scan = runQuery(starQ,emptyRangeSet); - intersections = scan.iterator(); - } - - init = true; - } - - if (!hasNextCalled && !isEmpty) { - while (intersections.hasNext() || bs.hasNext()) { - if (!intersections.hasNext()) { - scan.close(); - currentBs = bs.next(); - sq = StarQuery.getConstrainedStarQuery(sq, currentBs); - scan = runQuery(sq,emptyRangeSet); - intersections = scan.iterator(); - } - if (intersections.hasNext()) { - currentSolutionBs = deserializeKey(intersections.next().getKey(), sq, currentBs, - unCommonVarNames); - } else { - continue; - } - - if (sq.commonVarConstant() && currentSolutionBs.size() == unCommonVarNames.size() + sq.getUnCommonVars().size()) { - hasNextCalled = true; - return true; - } else if(currentSolutionBs.size() == unCommonVarNames.size() + sq.getUnCommonVars().size() + 1) { - hasNextCalled = true; - return true; - } - } - - isEmpty = true; - return false; - - } else if (isEmpty) { - return false; - } else { - return true; - } - } - - @Override - public void close() throws QueryEvaluationException { - scan.close(); - } - }; - } - } - - private QueryBindingSet deserializeKey(Key key, StarQuery sq, BindingSet currentBs, Set<String> unCommonVar) { - - - QueryBindingSet currentSolutionBs = new QueryBindingSet(); - - Text row = key.getRow(); - Text cq = key.getColumnQualifier(); - - - String[] cqArray = cq.toString().split(DocIndexIteratorUtil.DOC_ID_INDEX_DELIM); - - boolean commonVarSet = false; - - //if common Var is constant there is no common variable to assign a value to - if(sq.commonVarConstant()) { - commonVarSet = true; - } - - if (!commonVarSet && sq.isCommonVarURI()) { - RyaURI rURI = new RyaURI(row.toString()); - currentSolutionBs.addBinding(sq.getCommonVarName(), - RyaToRdfConversions.convertValue(rURI)); - commonVarSet = true; - } - - for (String s : sq.getUnCommonVars()) { - - byte[] cqBytes = cqArray[sq.getVarPos().get(s)].getBytes(); - int firstIndex = Bytes.indexOf(cqBytes, DELIM_BYTE); - int secondIndex = Bytes.lastIndexOf(cqBytes, DELIM_BYTE); - int typeIndex = Bytes.indexOf(cqBytes, TYPE_DELIM_BYTE); - byte[] tripleComponent = Arrays.copyOfRange(cqBytes, firstIndex + 1, secondIndex); - byte[] cqContent = Arrays.copyOfRange(cqBytes, secondIndex + 1, typeIndex); - byte[] objType = Arrays.copyOfRange(cqBytes, typeIndex, cqBytes.length); - - if ((new String(tripleComponent)).equals("object")) { - byte[] object = Bytes.concat(cqContent, objType); - org.openrdf.model.Value v = null; - try { - v = RyaToRdfConversions.convertValue(RyaContext.getInstance().deserialize( - object)); - } catch (RyaTypeResolverException e) { - e.printStackTrace(); - } - currentSolutionBs.addBinding(s, v); - - } else if ((new String(tripleComponent)).equals("subject")) { - if (!commonVarSet) { - byte[] object = Bytes.concat(row.getBytes(), objType); - org.openrdf.model.Value v = null; - try { - v = RyaToRdfConversions.convertValue(RyaContext.getInstance().deserialize( - object)); - } catch (RyaTypeResolverException e) { - e.printStackTrace(); - } - currentSolutionBs.addBinding(sq.getCommonVarName(), v); - commonVarSet = true; - } - RyaURI rURI = new RyaURI(new String(cqContent)); - currentSolutionBs.addBinding(s, RyaToRdfConversions.convertValue(rURI)); - } else { - throw new IllegalArgumentException("Invalid row."); - } - } - for (String s : unCommonVar) { - currentSolutionBs.addBinding(s, currentBs.getValue(s)); - } - return currentSolutionBs; - } - - private BatchScanner runQuery(StarQuery query, Collection<Range> ranges) throws QueryEvaluationException { - - try { - if (ranges.size() == 0) { - String rangeText = query.getCommonVarValue(); - Range r; - if (rangeText != null) { - r = new Range(new Text(query.getCommonVarValue())); - } else { - r = new Range(); - } - ranges = Collections.singleton(r); - } - - Connector accCon = ConfigUtils.getConnector(conf); - IteratorSetting is = new IteratorSetting(30, "fii", DocumentIndexIntersectingIterator.class); - - DocumentIndexIntersectingIterator.setColumnFamilies(is, query.getColumnCond()); - - if(query.hasContext()) { - DocumentIndexIntersectingIterator.setContext(is, query.getContextURI()); - } - bs = accCon.createBatchScanner(ConfigUtils.getEntityTableName(conf), - new Authorizations(conf.get(ConfigUtils.CLOUDBASE_AUTHS)), 15); - bs.addScanIterator(is); - bs.setRanges(ranges); - - return bs; - - } catch (TableNotFoundException e) { - e.printStackTrace(); - } catch (AccumuloException e) { - e.printStackTrace(); - } catch (AccumuloSecurityException e) { - e.printStackTrace(); - } - throw new QueryEvaluationException(); - } - - - @Override - public void close() throws IOException { - //TODO generate an exception when BS passed in -- scanner closed -// if (bs != null) { -// bs.close(); -// } - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityCentricIndex.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityCentricIndex.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityCentricIndex.java deleted file mode 100644 index b8b3f65..0000000 --- a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityCentricIndex.java +++ /dev/null @@ -1,252 +0,0 @@ -package mvm.rya.indexing.accumulo.entity; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -import static mvm.rya.accumulo.AccumuloRdfConstants.EMPTY_CV; -import static mvm.rya.accumulo.AccumuloRdfConstants.EMPTY_VALUE; -import static mvm.rya.api.RdfCloudTripleStoreConstants.DELIM_BYTES; -import static mvm.rya.api.RdfCloudTripleStoreConstants.EMPTY_BYTES; -import static mvm.rya.api.RdfCloudTripleStoreConstants.EMPTY_TEXT; - -import java.io.IOException; -import java.util.Collection; -import java.util.List; - -import mvm.rya.accumulo.AccumuloRdfConfiguration; -import mvm.rya.accumulo.experimental.AbstractAccumuloIndexer; -import mvm.rya.accumulo.experimental.AccumuloIndexer; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.domain.RyaType; -import mvm.rya.api.domain.RyaURI; -import mvm.rya.api.resolver.RdfToRyaConversions; -import mvm.rya.api.resolver.RyaContext; -import mvm.rya.api.resolver.RyaTypeResolverException; -import mvm.rya.api.resolver.triple.TripleRow; -import mvm.rya.indexing.accumulo.ConfigUtils; -import mvm.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer; - -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.MultiTableBatchWriter; -import org.apache.accumulo.core.client.MutationsRejectedException; -import org.apache.accumulo.core.client.TableExistsException; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.ColumnVisibility; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Text; -import org.apache.log4j.Logger; -import org.openrdf.model.Statement; -import org.openrdf.query.algebra.evaluation.QueryOptimizer; -import org.openrdf.query.algebra.evaluation.impl.BindingAssigner; -import org.openrdf.query.algebra.evaluation.impl.CompareOptimizer; -import org.openrdf.query.algebra.evaluation.impl.ConjunctiveConstraintSplitter; -import org.openrdf.query.algebra.evaluation.impl.ConstantOptimizer; -import org.openrdf.query.algebra.evaluation.impl.DisjunctiveConstraintOptimizer; -import org.openrdf.query.algebra.evaluation.impl.FilterOptimizer; -import org.openrdf.query.algebra.evaluation.impl.IterativeEvaluationOptimizer; -import org.openrdf.query.algebra.evaluation.impl.OrderLimitOptimizer; -import org.openrdf.query.algebra.evaluation.impl.QueryModelNormalizer; -import org.openrdf.query.algebra.evaluation.impl.SameTermFilterOptimizer; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.primitives.Bytes; - -public class EntityCentricIndex extends AbstractAccumuloIndexer { - - private static final Logger logger = Logger.getLogger(EntityCentricIndex.class); - private static final String TABLE_SUFFIX = "EntityCentricIndex"; - - private AccumuloRdfConfiguration conf; - private BatchWriter writer; - private boolean isInit = false; - - public static final String CONF_TABLE_SUFFIX = "ac.indexer.eci.tablename"; - - - private void init() throws AccumuloException, AccumuloSecurityException, TableNotFoundException, IOException, - TableExistsException { - ConfigUtils.createTableIfNotExists(conf, ConfigUtils.getEntityTableName(conf)); - } - - - @Override - public Configuration getConf() { - return this.conf; - } - - //initialization occurs in setConf because index is created using reflection - @Override - public void setConf(Configuration conf) { - if (conf instanceof AccumuloRdfConfiguration) { - this.conf = (AccumuloRdfConfiguration) conf; - } else { - this.conf = new AccumuloRdfConfiguration(conf); - } - if (!isInit) { - try { - init(); - isInit = true; - } catch (AccumuloException e) { - logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); - throw new RuntimeException(e); - } catch (AccumuloSecurityException e) { - logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); - throw new RuntimeException(e); - } catch (TableNotFoundException e) { - logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); - throw new RuntimeException(e); - } catch (TableExistsException e) { - logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); - throw new RuntimeException(e); - } catch (IOException e) { - logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); - throw new RuntimeException(e); - } - } - } - - - @Override - public String getTableName() { - return ConfigUtils.getEntityTableName(conf); - } - - @Override - public void setMultiTableBatchWriter(MultiTableBatchWriter writer) throws IOException { - try { - this.writer = writer.getBatchWriter(getTableName()); - } catch (AccumuloException e) { - throw new IOException(e); - } catch (AccumuloSecurityException e) { - throw new IOException(e); - } catch (TableNotFoundException e) { - throw new IOException(e); - } - - } - - - public void storeStatement(RyaStatement stmt) throws IOException { - Preconditions.checkNotNull(writer, "BatchWriter not Set"); - try { - for (TripleRow row : serializeStatement(stmt)) { - writer.addMutation(createMutation(row)); - } - } catch (MutationsRejectedException e) { - throw new IOException(e); - } catch (RyaTypeResolverException e) { - throw new IOException(e); - } - } - - - public void deleteStatement(RyaStatement stmt) throws IOException { - Preconditions.checkNotNull(writer, "BatchWriter not Set"); - try { - for (TripleRow row : serializeStatement(stmt)) { - writer.addMutation(deleteMutation(row)); - } - } catch (MutationsRejectedException e) { - throw new IOException(e); - } catch (RyaTypeResolverException e) { - throw new IOException(e); - } - } - - - protected Mutation deleteMutation(TripleRow tripleRow) { - Mutation m = new Mutation(new Text(tripleRow.getRow())); - - byte[] columnFamily = tripleRow.getColumnFamily(); - Text cfText = columnFamily == null ? EMPTY_TEXT : new Text(columnFamily); - - byte[] columnQualifier = tripleRow.getColumnQualifier(); - Text cqText = columnQualifier == null ? EMPTY_TEXT : new Text(columnQualifier); - - m.putDelete(cfText, cqText, new ColumnVisibility(tripleRow.getColumnVisibility()), tripleRow.getTimestamp()); - return m; - } - - public static Collection<Mutation> createMutations(RyaStatement stmt) throws RyaTypeResolverException{ - Collection<Mutation> m = Lists.newArrayList(); - for (TripleRow tr : serializeStatement(stmt)){ - m.add(createMutation(tr)); - } - return m; - } - - private static Mutation createMutation(TripleRow tripleRow) { - Mutation mutation = new Mutation(new Text(tripleRow.getRow())); - byte[] columnVisibility = tripleRow.getColumnVisibility(); - ColumnVisibility cv = columnVisibility == null ? EMPTY_CV : new ColumnVisibility(columnVisibility); - Long timestamp = tripleRow.getTimestamp(); - byte[] value = tripleRow.getValue(); - Value v = value == null ? EMPTY_VALUE : new Value(value); - byte[] columnQualifier = tripleRow.getColumnQualifier(); - Text cqText = columnQualifier == null ? EMPTY_TEXT : new Text(columnQualifier); - byte[] columnFamily = tripleRow.getColumnFamily(); - Text cfText = columnFamily == null ? EMPTY_TEXT : new Text(columnFamily); - - mutation.put(cfText, cqText, cv, timestamp, v); - return mutation; - } - - private static List<TripleRow> serializeStatement(RyaStatement stmt) throws RyaTypeResolverException { - RyaURI subject = stmt.getSubject(); - RyaURI predicate = stmt.getPredicate(); - RyaType object = stmt.getObject(); - RyaURI context = stmt.getContext(); - Long timestamp = stmt.getTimestamp(); - byte[] columnVisibility = stmt.getColumnVisibility(); - byte[] value = stmt.getValue(); - assert subject != null && predicate != null && object != null; - byte[] cf = (context == null) ? EMPTY_BYTES : context.getData().getBytes(); - byte[] subjBytes = subject.getData().getBytes(); - byte[] predBytes = predicate.getData().getBytes(); - byte[][] objBytes = RyaContext.getInstance().serializeType(object); - - return Lists.newArrayList(new TripleRow(subjBytes, // - predBytes, // - Bytes.concat(cf, DELIM_BYTES, // - "object".getBytes(), DELIM_BYTES, // - objBytes[0], objBytes[1]), // - timestamp, // - columnVisibility, // - value// - ), - - new TripleRow(objBytes[0], // - predBytes, // - Bytes.concat(cf, DELIM_BYTES, // - "subject".getBytes(), DELIM_BYTES, // - subjBytes, objBytes[1]), // - timestamp, // - columnVisibility, // - value// - )); - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityLocalityGroupSetter.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityLocalityGroupSetter.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityLocalityGroupSetter.java deleted file mode 100644 index 2030e58..0000000 --- a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityLocalityGroupSetter.java +++ /dev/null @@ -1,171 +0,0 @@ -package mvm.rya.indexing.accumulo.entity; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Map.Entry; -import java.util.NoSuchElementException; -import java.util.Set; -import mvm.rya.indexing.accumulo.ConfigUtils; -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.BatchScanner; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Text; - -public class EntityLocalityGroupSetter { - - - String tablePrefix; - Connector conn; - Configuration conf; - - public EntityLocalityGroupSetter(String tablePrefix, Connector conn, Configuration conf) { - this.conn = conn; - this.tablePrefix = tablePrefix; - this.conf = conf; - } - - - - private Iterator<String> getPredicates() { - - String auths = conf.get(ConfigUtils.CLOUDBASE_AUTHS); - BatchScanner bs = null; - try { - bs = conn.createBatchScanner(tablePrefix + "prospects", new Authorizations(auths), 10); - } catch (TableNotFoundException e) { - e.printStackTrace(); - } - bs.setRanges(Collections.singleton(Range.prefix(new Text("predicate" + "\u0000")))); - final Iterator<Entry<Key,Value>> iter = bs.iterator(); - - return new Iterator<String>() { - - private String next = null; - private boolean hasNextCalled = false; - private boolean isEmpty = false; - - @Override - public boolean hasNext() { - - if (!hasNextCalled && !isEmpty) { - while (iter.hasNext()) { - Entry<Key,Value> temp = iter.next(); - String row = temp.getKey().getRow().toString(); - String[] rowArray = row.split("\u0000"); - next = rowArray[1]; - - hasNextCalled = true; - return true; - } - isEmpty = true; - return false; - } else if(isEmpty) { - return false; - }else { - return true; - } - } - - @Override - public String next() { - - if (hasNextCalled) { - hasNextCalled = false; - return next; - } else if(isEmpty) { - throw new NoSuchElementException(); - }else { - if (this.hasNext()) { - hasNextCalled = false; - return next; - } else { - throw new NoSuchElementException(); - } - } - } - - @Override - public void remove() { - - throw new UnsupportedOperationException("Cannot delete from iterator!"); - - } - - }; - } - - - - - - - - - public void setLocalityGroups() { - - HashMap<String, Set<Text>> localityGroups = new HashMap<String, Set<Text>>(); - Iterator<String> groups = getPredicates(); - - int i = 1; - - while(groups.hasNext()) { - HashSet<Text> tempColumn = new HashSet<Text>(); - String temp = groups.next(); - tempColumn.add(new Text(temp)); - String groupName = "predicate" + i; - localityGroups.put(groupName, tempColumn); - i++; - } - - - try { - conn.tableOperations().setLocalityGroups(tablePrefix + "doc_partitioned_index", localityGroups); - //conn.tableOperations().compact(tablePrefix + "doc_partitioned_index", null, null, true, true); - } catch (AccumuloException e) { - e.printStackTrace(); - } catch (AccumuloSecurityException e) { - e.printStackTrace(); - } catch (TableNotFoundException e) { - e.printStackTrace(); - } - - - - } - - - - - - - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityOptimizer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityOptimizer.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityOptimizer.java deleted file mode 100644 index e46c321..0000000 --- a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityOptimizer.java +++ /dev/null @@ -1,436 +0,0 @@ -package mvm.rya.indexing.accumulo.entity; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Set; - -import mvm.rya.accumulo.AccumuloRdfConfiguration; -import mvm.rya.api.RdfCloudTripleStoreConfiguration; -import mvm.rya.api.persist.RdfEvalStatsDAO; -import mvm.rya.api.persist.joinselect.SelectivityEvalDAO; -import mvm.rya.indexing.accumulo.ConfigUtils; -import mvm.rya.joinselect.AccumuloSelectivityEvalDAO; -import mvm.rya.prospector.service.ProspectorServiceEvalStatsDAO; -import mvm.rya.rdftriplestore.inference.DoNotExpandSP; -import mvm.rya.rdftriplestore.utils.FixedStatementPattern; - -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.openrdf.query.BindingSet; -import org.openrdf.query.Dataset; -import org.openrdf.query.algebra.Filter; -import org.openrdf.query.algebra.Join; -import org.openrdf.query.algebra.QueryModelNode; -import org.openrdf.query.algebra.StatementPattern; -import org.openrdf.query.algebra.TupleExpr; -import org.openrdf.query.algebra.Var; -import org.openrdf.query.algebra.evaluation.QueryOptimizer; -import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; - -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - -public class EntityOptimizer implements QueryOptimizer, Configurable { - - private SelectivityEvalDAO<RdfCloudTripleStoreConfiguration> eval; - private RdfCloudTripleStoreConfiguration conf; - private boolean isEvalDaoSet = false; - - - public EntityOptimizer() { - - } - - public EntityOptimizer(RdfCloudTripleStoreConfiguration conf) { - if(conf.isUseStats() && conf.isUseSelectivity()) { - try { - eval = new AccumuloSelectivityEvalDAO(conf, ConfigUtils.getConnector(conf)); - ((AccumuloSelectivityEvalDAO)eval).setRdfEvalDAO(new ProspectorServiceEvalStatsDAO(ConfigUtils.getConnector(conf), conf)); - eval.init(); - } catch (AccumuloException e) { - e.printStackTrace(); - } catch (AccumuloSecurityException e) { - e.printStackTrace(); - } - - isEvalDaoSet = true; - } else { - eval = null; - isEvalDaoSet = true; - } - this.conf = conf; - } - - public EntityOptimizer(SelectivityEvalDAO<RdfCloudTripleStoreConfiguration> eval) { - this.eval = eval; - this.conf = eval.getConf(); - isEvalDaoSet = true; - } - - @Override - public void setConf(Configuration conf) { - if(conf instanceof RdfCloudTripleStoreConfiguration) { - this.conf = (RdfCloudTripleStoreConfiguration) conf; - } else { - this.conf = new AccumuloRdfConfiguration(conf); - } - - if (!isEvalDaoSet) { - if(this.conf.isUseStats() && this.conf.isUseSelectivity()) { - try { - eval = new AccumuloSelectivityEvalDAO(this.conf, ConfigUtils.getConnector(this.conf)); - ((AccumuloSelectivityEvalDAO)eval).setRdfEvalDAO(new ProspectorServiceEvalStatsDAO(ConfigUtils.getConnector(this.conf), this.conf)); - eval.init(); - } catch (AccumuloException e) { - e.printStackTrace(); - } catch (AccumuloSecurityException e) { - e.printStackTrace(); - } - - isEvalDaoSet = true; - } else { - eval = null; - isEvalDaoSet = true; - } - } - - } - - @Override - public Configuration getConf() { - return conf; - } - - /** - * Applies generally applicable optimizations: path expressions are sorted - * from more to less specific. - * - * @param tupleExpr - */ - @Override - public void optimize(TupleExpr tupleExpr, Dataset dataset, BindingSet bindings) { - tupleExpr.visit(new JoinVisitor()); - } - - protected class JoinVisitor extends QueryModelVisitorBase<RuntimeException> { - - @Override - public void meet(Join node) { - try { - if (node.getLeftArg() instanceof FixedStatementPattern && node.getRightArg() instanceof DoNotExpandSP) { - return; - } - List<TupleExpr> joinArgs = getJoinArgs(node, new ArrayList<TupleExpr>()); - HashMultimap<String, StatementPattern> varMap = getVarBins(joinArgs); - while (!varMap.keySet().isEmpty()) { - String s = getHighestPriorityKey(varMap); - constructTuple(varMap, joinArgs, s); - } - List<TupleExpr> filterChain = getFilterChain(joinArgs); - - for (TupleExpr te : joinArgs) { - if (!(te instanceof StatementPattern) || !(te instanceof EntityTupleSet)) { - te.visit(this); - } - } - // Replace old join hierarchy - node.replaceWith(getNewJoin(joinArgs, filterChain)); - - } catch (Exception e) { - e.printStackTrace(); - } - } - - private List<TupleExpr> getFilterChain(List<TupleExpr> joinArgs) { - List<TupleExpr> filterTopBottom = Lists.newArrayList(); - TupleExpr filterChainTop = null; - TupleExpr filterChainBottom = null; - - for(int i = 0; i < joinArgs.size(); i++) { - if(joinArgs.get(i) instanceof Filter) { - if(filterChainTop == null) { - filterChainTop = joinArgs.remove(i); - i--; - } else if(filterChainBottom == null){ - filterChainBottom = joinArgs.remove(i); - ((Filter)filterChainTop).setArg(filterChainBottom); - i--; - } else { - ((Filter)filterChainBottom).setArg(joinArgs.remove(i)); - filterChainBottom = ((Filter)filterChainBottom).getArg(); - i--; - } - } - } - if(filterChainTop != null) { - filterTopBottom.add(filterChainTop); - } - if(filterChainBottom != null) { - filterTopBottom.add(filterChainBottom); - } - return filterTopBottom; - } - - private TupleExpr getNewJoin(List<TupleExpr> joinArgs, List<TupleExpr> filterChain) { - TupleExpr newJoin; - - if (joinArgs.size() > 1) { - if (filterChain.size() > 0) { - TupleExpr finalJoinArg = joinArgs.remove(0); - TupleExpr tempJoin; - TupleExpr temp = filterChain.get(0); - - if (joinArgs.size() > 1) { - tempJoin = new Join(joinArgs.remove(0), joinArgs.remove(0)); - for (TupleExpr te : joinArgs) { - tempJoin = new Join(tempJoin, te); - } - } else { - tempJoin = joinArgs.remove(0); - } - - if (filterChain.size() == 1) { - ((Filter) temp).setArg(tempJoin); - } else { - ((Filter) filterChain.get(1)).setArg(tempJoin); - } - newJoin = new Join(temp, finalJoinArg); - } else { - newJoin = new Join(joinArgs.get(0), joinArgs.get(1)); - joinArgs.remove(0); - joinArgs.remove(0); - - for (TupleExpr te : joinArgs) { - newJoin = new Join(newJoin, te); - } - } - } else if (joinArgs.size() == 1) { - if (filterChain.size() > 0) { - newJoin = filterChain.get(0); - if (filterChain.size() == 1) { - ((Filter) newJoin).setArg(joinArgs.get(0)); - } else { - ((Filter) filterChain.get(1)).setArg(joinArgs.get(0)); - } - } else { - newJoin = joinArgs.get(0); - } - } else { - throw new IllegalStateException("JoinArgs size cannot be zero."); - } - return newJoin; - } - - private HashMultimap<String, StatementPattern> getVarBins(List<TupleExpr> nodes) { - - HashMultimap<String, StatementPattern> varMap = HashMultimap.create(); - - for (QueryModelNode node : nodes) { - if (node instanceof StatementPattern) { - StatementPattern sp = (StatementPattern) node; - if (sp.getPredicateVar().isConstant()) { - varMap.put(sp.getSubjectVar().getName(), sp); - varMap.put(sp.getObjectVar().getName(), sp); - } - } - } - - removeInvalidBins(varMap, true); - - return varMap; - } - - private void updateVarMap(HashMultimap<String, StatementPattern> varMap, Set<StatementPattern> bin) { - - for (StatementPattern sp : bin) { - varMap.remove(sp.getSubjectVar().getName(), sp); - varMap.remove(sp.getObjectVar().getName(), sp); - } - - removeInvalidBins(varMap, false); - - } - - private void removeInvalidBins(HashMultimap<String, StatementPattern> varMap, boolean newMap) { - - Set<String> keys = Sets.newHashSet(varMap.keySet()); - - if (newMap) { - for (String s : keys) { - Set<StatementPattern> spSet = Sets.newHashSet(varMap.get(s)); - if (!StarQuery.isValidStarQuery(spSet)) { - for (StatementPattern sp : spSet) { - varMap.remove(s, sp); - } - } - - } - } else { - - for (String s : keys) { - Set<StatementPattern> spSet = Sets.newHashSet(varMap.get(s)); - if (spSet.size() == 1) { - for (StatementPattern sp : spSet) { - varMap.remove(s, sp); - } - } - - } - } - - } - - private void constructTuple(HashMultimap<String, StatementPattern> varMap, List<TupleExpr> joinArgs, - String binName) { - - Set<StatementPattern> bin = Sets.newHashSet(varMap.get(binName)); - StarQuery sq = new StarQuery(bin); - - updateVarMap(varMap, bin); - for (StatementPattern sp : bin) { - joinArgs.remove(sp); - } - - joinArgs.add(new EntityTupleSet(sq, conf)); - - } - - private String getHighestPriorityKey(HashMultimap<String, StatementPattern> varMap) { - - double tempPriority = -1; - double priority = -Double.MAX_VALUE; - String priorityKey = ""; - Set<StatementPattern> bin = null; - - Set<String> keys = varMap.keySet(); - - for (String s : keys) { - bin = varMap.get(s); - tempPriority = bin.size(); - tempPriority *= getCardinality(bin); - tempPriority *= getMinCardSp(bin); - - // weight starQuery where common Var is constant slightly more -- this factor is subject - // to change - if(s.startsWith("-const-")) { - tempPriority *= 10; - } - if (tempPriority > priority) { - priority = tempPriority; - priorityKey = s; - } - } - return priorityKey; - } - - private double getMinCardSp(Collection<StatementPattern> nodes) { - - double cardinality = Double.MAX_VALUE; - double tempCard = -1; - - if (eval == null) { - return 1; - } - - for (StatementPattern sp : nodes) { - - try { - tempCard = eval.getCardinality(conf, sp); - - if (tempCard < cardinality) { - cardinality = tempCard; - - } - } catch (Exception e) { - e.printStackTrace(); - } - - } - - return cardinality; - - } - - private double getCardinality(Collection<StatementPattern> spNodes) { - - double cardinality = Double.MAX_VALUE; - double tempCard = -1; - - - if(eval == null) { - return 1; - } - - List<StatementPattern> nodes = Lists.newArrayList(spNodes); - - AccumuloSelectivityEvalDAO ase = (AccumuloSelectivityEvalDAO) eval; - ase.setDenormalized(true); - - try { - - for (int i = 0; i < nodes.size(); i++) { - for (int j = i + 1; j < nodes.size(); j++) { - tempCard = ase.getJoinSelect(conf, nodes.get(i), nodes.get(j)); - if (tempCard < cardinality) { - cardinality = tempCard; - } - } - } - - } catch (Exception e) { - e.printStackTrace(); - } - - ase.setDenormalized(false); - - return cardinality / (nodes.size() + 1); - - } - - protected <L extends List<TupleExpr>> L getJoinArgs(TupleExpr tupleExpr, L joinArgs) { - if (tupleExpr instanceof Join) { - if (!(((Join) tupleExpr).getLeftArg() instanceof FixedStatementPattern) - && !(((Join) tupleExpr).getRightArg() instanceof DoNotExpandSP)) { - Join join = (Join) tupleExpr; - getJoinArgs(join.getLeftArg(), joinArgs); - getJoinArgs(join.getRightArg(), joinArgs); - } - } else if(tupleExpr instanceof Filter) { - joinArgs.add(tupleExpr); - getJoinArgs(((Filter)tupleExpr).getArg(), joinArgs); - } else { - joinArgs.add(tupleExpr); - } - - return joinArgs; - } - - } - - - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityTupleSet.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityTupleSet.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityTupleSet.java deleted file mode 100644 index dbe7a53..0000000 --- a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityTupleSet.java +++ /dev/null @@ -1,264 +0,0 @@ -package mvm.rya.indexing.accumulo.entity; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -import info.aduna.iteration.CloseableIteration; - -import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Set; - -import mvm.rya.accumulo.AccumuloRdfConfiguration; -import mvm.rya.accumulo.AccumuloRyaDAO; -import mvm.rya.api.RdfCloudTripleStoreConfiguration; -import mvm.rya.indexing.accumulo.ConfigUtils; -import mvm.rya.indexing.accumulo.entity.StarQuery.CardinalityStatementPattern; -import mvm.rya.joinselect.AccumuloSelectivityEvalDAO; -import mvm.rya.prospector.service.ProspectorServiceEvalStatsDAO; -import mvm.rya.rdftriplestore.RdfCloudTripleStore; -import mvm.rya.rdftriplestore.RdfCloudTripleStoreConnection; -import mvm.rya.rdftriplestore.evaluation.ExternalBatchingIterator; - -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.Connector; -import org.apache.commons.io.IOUtils; -import org.openrdf.query.BindingSet; -import org.openrdf.query.QueryEvaluationException; -import org.openrdf.query.algebra.StatementPattern; -import org.openrdf.query.algebra.Var; -import org.openrdf.query.algebra.evaluation.QueryBindingSet; -import org.openrdf.query.algebra.evaluation.impl.ExternalSet; -import org.openrdf.sail.SailException; - -import com.beust.jcommander.internal.Sets; -import com.google.common.base.Joiner; - -public class EntityTupleSet extends ExternalSet implements ExternalBatchingIterator { - - - private StarQuery starQuery; - private RdfCloudTripleStoreConfiguration conf; - private Set<String> variables; - private double cardinality = -1; - private StatementPattern minSp; - private double minCard; - private Connector accCon = null; - private boolean evalOptUsed = false; - - public EntityTupleSet() { - - } - - public EntityTupleSet(StarQuery sq, RdfCloudTripleStoreConfiguration conf) { - this.starQuery = sq; - this.conf = conf; - - variables = Sets.newHashSet(); - if(!starQuery.commonVarConstant()) { - variables.add(starQuery.getCommonVarName()); - } - variables.addAll(starQuery.getUnCommonVars()); - - init(); - - } - - public EntityTupleSet(StarQuery sq, RdfCloudTripleStoreConfiguration conf, boolean evalOptUsed) { - this(sq,conf); - this.evalOptUsed = evalOptUsed; - } - - private void init() { - - try { - accCon = ConfigUtils.getConnector(conf); - } catch (AccumuloException e) { - e.printStackTrace(); - } catch (AccumuloSecurityException e) { - e.printStackTrace(); - } - if (conf.isUseStats() && conf.isUseSelectivity()) { - - ProspectorServiceEvalStatsDAO evalDao = new ProspectorServiceEvalStatsDAO(accCon, conf); - evalDao.init(); - AccumuloSelectivityEvalDAO ase = new AccumuloSelectivityEvalDAO(conf, accCon); - ase.setRdfEvalDAO(evalDao); - ase.init(); - - cardinality = starQuery.getCardinality(ase); - CardinalityStatementPattern csp = starQuery.getMinCardSp(ase); - - minCard = csp.getCardinality(); - minSp = csp.getSp(); - } else { - // TODO come up with a better default if cardinality is not - // initialized - cardinality = minCard = 1; - minSp = starQuery.getNodes().get(0); - } - - } - - @Override - public Set<String> getBindingNames() { - return starQuery.getBindingNames(); - } - - @Override - public Set<String> getAssuredBindingNames() { - return starQuery.getAssuredBindingNames(); - } - - public Set<String> getVariables() { - return variables; - } - - - @Override - public String getSignature() { - return "(EntityCentric Projection) " + " common Var: " + starQuery.getCommonVarName() + " variables: " + Joiner.on(", ").join(variables).replaceAll("\\s+", " "); - } - - public StarQuery getStarQuery() { - return starQuery; - } - - public void setStarQuery(StarQuery sq) { - this.starQuery = sq; - } - - - @Override - public EntityTupleSet clone() { - StarQuery sq = new StarQuery(starQuery); - return new EntityTupleSet(sq, conf); - } - - - @Override - public double cardinality() { - return cardinality; - } - - - public double getMinSpCard() { - return minCard; - } - - - @Override - public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(BindingSet bindings) throws QueryEvaluationException { - - // if starQuery contains node with cardinality less than 1000 and node - // only has one variable, and number of SPs in starQuery is greater than 2, it is - // more efficient to first evaluate this node and then pass the bindings - // into the remainder of the star query to be evaluated - if (minCard < 1000 && starQuery.size() > 2 && numberOfSpVars(minSp) == 1 && !starQuery.commonVarConstant()) { - - try { - RdfCloudTripleStoreConnection conn = getRyaSailConnection(); - CloseableIteration<BindingSet, QueryEvaluationException> sol = (CloseableIteration<BindingSet, QueryEvaluationException>) conn - .evaluate(minSp, null, bindings, false); - - Set<BindingSet> bSet = Sets.newHashSet(); - while (sol.hasNext()) { - //TODO this is not optimal - should check if bindings variables intersect minSp variables - //creating the following QueryBindingSet is only necessary if no intersection occurs - QueryBindingSet bs = new QueryBindingSet(); - bs.addAll(sol.next()); - bs.addAll(bindings); - bSet.add(bs); - } - - List<StatementPattern> spList = starQuery.getNodes(); - spList.remove(minSp); - - StarQuery sq = new StarQuery(spList); - conn.close(); - - return (new EntityTupleSet(sq, conf, true)).evaluate(bSet); - - } catch (Exception e) { - throw new QueryEvaluationException(e); - } - } else { - this.evalOptUsed = true; - return this.evaluate(Collections.singleton(bindings)); - } - - } - - - private int numberOfSpVars(StatementPattern sp) { - List<Var> varList = sp.getVarList(); - int varCount = 0; - - for(int i = 0; i < 3; i++) { - if(!varList.get(i).isConstant()) { - varCount++; - } - } - - return varCount; - } - - - @Override - public CloseableIteration<BindingSet,QueryEvaluationException> evaluate(final Collection<BindingSet> bindingset) throws QueryEvaluationException { - - if(bindingset.size() < 2 && !this.evalOptUsed) { - BindingSet bs = new QueryBindingSet(); - if (bindingset.size() == 1) { - bs = bindingset.iterator().next(); - } - return this.evaluate(bs); - } - //TODO possibly refactor if bindingset.size() > 0 to take advantage of optimization in evaluate(BindingSet bindingset) - AccumuloDocIdIndexer adi = null; - try { - adi = new AccumuloDocIdIndexer(conf); - return adi.queryDocIndex(starQuery, bindingset); - } catch (Exception e) { - throw new QueryEvaluationException(e); - } finally { - IOUtils.closeQuietly(adi); - } - } - - - private RdfCloudTripleStoreConnection getRyaSailConnection() throws AccumuloException, - AccumuloSecurityException, SailException { - final RdfCloudTripleStore store = new RdfCloudTripleStore(); - AccumuloRyaDAO crdfdao = new AccumuloRyaDAO(); - crdfdao.setConnector(accCon); - AccumuloRdfConfiguration acc = new AccumuloRdfConfiguration(conf); - crdfdao.setConf(acc); - store.setRyaDAO(crdfdao); - store.initialize(); - - return (RdfCloudTripleStoreConnection) store.getConnection(); - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/StarQuery.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/StarQuery.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/StarQuery.java deleted file mode 100644 index e9d2f85..0000000 --- a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/StarQuery.java +++ /dev/null @@ -1,636 +0,0 @@ -package mvm.rya.indexing.accumulo.entity; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import mvm.rya.accumulo.documentIndex.TextColumn; -import mvm.rya.api.domain.RyaType; -import mvm.rya.api.domain.RyaURI; -import mvm.rya.api.persist.joinselect.SelectivityEvalDAO; -import mvm.rya.api.resolver.RdfToRyaConversions; -import mvm.rya.api.resolver.RyaContext; -import mvm.rya.api.resolver.RyaTypeResolverException; -import mvm.rya.joinselect.AccumuloSelectivityEvalDAO; - -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.hadoop.io.Text; -import org.openrdf.model.Value; -import org.openrdf.query.BindingSet; -import org.openrdf.query.algebra.StatementPattern; -import org.openrdf.query.algebra.Var; - -import com.beust.jcommander.internal.Maps; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import com.google.common.primitives.Bytes; - -public class StarQuery { - - private List<StatementPattern> nodes; - private TextColumn[] nodeColumnCond; - private String commonVarName; - private Var commonVar; - private Var context; - private String contextURI =""; - private Map<String,Integer> varPos = Maps.newHashMap(); - private boolean isCommonVarURI = false; - - - public StarQuery(List<StatementPattern> nodes) { - this.nodes = nodes; - if(nodes.size() == 0) { - throw new IllegalArgumentException("Nodes cannot be empty!"); - } - nodeColumnCond = new TextColumn[nodes.size()]; - Var tempContext = (Var) nodes.get(0).getContextVar(); - if(tempContext != null) { - context = (Var)tempContext.clone(); - } else { - context = new Var(); - } - try { - this.init(); - } catch (RyaTypeResolverException e) { - e.printStackTrace(); - } - } - - - public StarQuery(Set<StatementPattern> nodes) { - this(Lists.newArrayList(nodes)); - } - - public int size() { - return nodes.size(); - } - - public StarQuery(StarQuery other) { - this(other.nodes); - } - - - public List<StatementPattern> getNodes() { - return nodes; - } - - - public TextColumn[] getColumnCond() { - return nodeColumnCond; - } - - - public boolean isCommonVarURI() { - return isCommonVarURI; - } - - public String getCommonVarName() { - return commonVarName; - } - - public Var getCommonVar() { - return commonVar; - } - - public boolean commonVarHasValue() { - return commonVar.getValue() != null; - } - - public boolean commonVarConstant() { - return commonVar.isConstant(); - } - - public String getCommonVarValue() { - if(commonVarHasValue()) { - return commonVar.getValue().stringValue(); - } else { - return null; - } - } - - - public Set<String> getUnCommonVars() { - return varPos.keySet(); - } - - - public Map<String,Integer> getVarPos() { - return varPos; - } - - public boolean hasContext() { - return context.getValue() != null; - } - - public String getContextURI() { - return contextURI; - } - - - - - public Set<String> getBindingNames() { - - Set<String> bindingNames = Sets.newHashSet(); - - for(StatementPattern sp: nodes) { - - if(bindingNames.size() == 0) { - bindingNames = sp.getBindingNames(); - } else { - bindingNames = Sets.union(bindingNames, sp.getBindingNames()); - } - - } - - return bindingNames; - - } - - - - - public Set<String> getAssuredBindingNames() { - - Set<String> bindingNames = Sets.newHashSet(); - - for(StatementPattern sp: nodes) { - - if(bindingNames.size() == 0) { - bindingNames = sp.getAssuredBindingNames(); - } else { - bindingNames = Sets.union(bindingNames, sp.getAssuredBindingNames()); - } - - } - - return bindingNames; - - } - - - - - - - - public CardinalityStatementPattern getMinCardSp(AccumuloSelectivityEvalDAO ase) { - - StatementPattern minSp = null; - double cardinality = Double.MAX_VALUE; - double tempCard = -1; - - for (StatementPattern sp : nodes) { - - try { - tempCard = ase.getCardinality(ase.getConf(), sp); - - if (tempCard < cardinality) { - cardinality = tempCard; - minSp = sp; - } - } catch (TableNotFoundException e) { - e.printStackTrace(); - } - - - } - - return new CardinalityStatementPattern(minSp, cardinality) ; - } - - - - public class CardinalityStatementPattern { - - private StatementPattern sp; - private double cardinality; - - public CardinalityStatementPattern(StatementPattern sp, double cardinality) { - this.sp = sp; - this.cardinality = cardinality; - } - - public StatementPattern getSp() { - return sp; - } - - public double getCardinality() { - return cardinality; - } - - } - - - public double getCardinality( AccumuloSelectivityEvalDAO ase) { - - double cardinality = Double.MAX_VALUE; - double tempCard = -1; - - ase.setDenormalized(true); - - try { - - for (int i = 0; i < nodes.size(); i++) { - for (int j = i + 1; j < nodes.size(); j++) { - - tempCard = ase.getJoinSelect(ase.getConf(), nodes.get(i), nodes.get(j)); - - if (tempCard < cardinality) { - cardinality = tempCard; - } - - } - } - - } catch (Exception e) { - e.printStackTrace(); - } - - ase.setDenormalized(false); - - return cardinality/(nodes.size() + 1); - - } - - - - public static Set<String> getCommonVars(StarQuery query, BindingSet bs) { - - Set<String> starQueryVarNames = Sets.newHashSet(); - - if(bs == null || bs.size() == 0) { - return Sets.newHashSet(); - } - - Set<String> bindingNames = bs.getBindingNames(); - starQueryVarNames.addAll(query.getUnCommonVars()); - if(!query.commonVarConstant()) { - starQueryVarNames.add(query.getCommonVarName()); - } - - return Sets.intersection(bindingNames, starQueryVarNames); - - - } - - - - - - - public static StarQuery getConstrainedStarQuery(StarQuery query, BindingSet bs) { - - if(bs.size() == 0) { - return query; - } - - Set<String> bindingNames = bs.getBindingNames(); - Set<String> unCommonVarNames = query.getUnCommonVars(); - Set<String> intersectVar = Sets.intersection(bindingNames, unCommonVarNames); - - - if (!query.commonVarConstant()) { - - Value v = bs.getValue(query.getCommonVarName()); - - if (v != null) { - query.commonVar.setValue(v); - } - } - - for(String s: intersectVar) { - try { - query.nodeColumnCond[query.varPos.get(s)] = query.setValue(query.nodeColumnCond[query.varPos.get(s)], bs.getValue(s)); - } catch (RyaTypeResolverException e) { - e.printStackTrace(); - } - } - - return query; - } - - - private TextColumn setValue(TextColumn tc, Value v) throws RyaTypeResolverException { - - String cq = tc.getColumnQualifier().toString(); - String[] cqArray = cq.split("\u0000"); - - if (cqArray[0].equals("subject")) { - // RyaURI subjURI = (RyaURI) RdfToRyaConversions.convertValue(v); - tc.setColumnQualifier(new Text("subject" + "\u0000" + v.stringValue())); - tc.setIsPrefix(false); - } else if (cqArray[0].equals("object")) { - RyaType objType = RdfToRyaConversions.convertValue(v); - byte[][] b1 = RyaContext.getInstance().serializeType(objType); - byte[] b2 = Bytes.concat("object".getBytes(), - "\u0000".getBytes(), b1[0], b1[1]); - tc.setColumnQualifier(new Text(b2)); - tc.setIsPrefix(false); - } else { - throw new IllegalStateException("Invalid direction!"); - } - - return tc; - - } - - - - //assumes nodes forms valid star query with only one common variable - //assumes nodes and commonVar has been set - private TextColumn nodeToTextColumn(StatementPattern node, int i) throws RyaTypeResolverException { - - RyaContext rc = RyaContext.getInstance(); - - Var subjVar = node.getSubjectVar(); - Var predVar = node.getPredicateVar(); - Var objVar = node.getObjectVar(); - - RyaURI predURI = (RyaURI) RdfToRyaConversions.convertValue(node.getPredicateVar().getValue()); - - - //assumes StatementPattern contains at least on variable - if (subjVar.isConstant()) { - if (commonVarConstant()) { - varPos.put(objVar.getName(), i); - return new TextColumn(new Text(predURI.getData()), new Text("object")); - } else { - return new TextColumn(new Text(predURI.getData()), new Text("subject" + "\u0000" - + subjVar.getValue().stringValue())); - } - - } else if (objVar.isConstant()) { - - if (commonVarConstant()) { - varPos.put(subjVar.getName(), i); - return new TextColumn(new Text(predURI.getData()), new Text("subject")); - } else { - - isCommonVarURI = true; - RyaType objType = RdfToRyaConversions.convertValue(objVar.getValue()); - byte[][] b1 = rc.serializeType(objType); - - byte[] b2 = Bytes.concat("object".getBytes(), "\u0000".getBytes(), b1[0], b1[1]); - return new TextColumn(new Text(predURI.getData()), new Text(b2)); - } - - } else { - if (subjVar.getName().equals(commonVarName)) { - - isCommonVarURI = true; - varPos.put(objVar.getName(), i); - - TextColumn tc = new TextColumn(new Text(predURI.getData()), new Text("object")); - tc.setIsPrefix(true); - return tc; - - } else { - - varPos.put(subjVar.getName(), i); - - TextColumn tc = new TextColumn(new Text(predURI.getData()), new Text("subject")); - tc.setIsPrefix(true); - return tc; - - } - - - } - - - } - - - - - //called in constructor after nodes set - //assumes nodes and nodeColumnCond are same size - private void init() throws RyaTypeResolverException { - - - commonVar = this.getCommonVar(nodes); - if(!commonVar.isConstant()) { - commonVarName = commonVar.getName(); - } else { - commonVarName = commonVar.getName().substring(7); - } - - if(hasContext()) { - RyaURI ctxtURI = (RyaURI) RdfToRyaConversions.convertValue(context.getValue()); - contextURI = ctxtURI.getData(); - } - - for(int i = 0; i < nodes.size(); i++){ - nodeColumnCond[i] = nodeToTextColumn(nodes.get(i), i); - } - - } - - - - - - - - - // called after nodes set - // assumes nodes forms valid query with single, common variable - private Var getCommonVar(List<StatementPattern> nodes) { - - Set<Var> vars = null; - List<Var> tempVar; - Set<Var> tempSet; - - int i = 0; - for (StatementPattern sp : nodes) { - - if (vars == null) { - vars = Sets.newHashSet(); - vars.add(sp.getSubjectVar()); - vars.add(sp.getObjectVar()); - } else { - tempSet = Sets.newHashSet(); - tempSet.add(sp.getSubjectVar()); - tempSet.add(sp.getObjectVar()); - vars = Sets.intersection(vars, tempSet); - } - - } - - if (vars.size() == 1) { - return vars.iterator().next(); - } else if (vars.size() > 1) { - Var first = null; - - i = 0; - - for (Var v : vars) { - i++; - - if (i == 1) { - first = v; - } else { - if (v.isConstant()) { - return v; - } - } - } - - return first; - - } else { - throw new IllegalStateException("No common Var!"); - } - - } - - - //assumes bindings is not of size 0 - private static boolean isBindingsetValid(Set<String> bindings) { - - int varCount = 0; - - if (bindings.size() == 1) { - return true; - } else { - - - for (String s : bindings) { - if (!s.startsWith("-const-")) { - varCount++; - } - if (varCount > 1) { - return false; - } - } - - return true; - - } - - } - - - - - - public static boolean isValidStarQuery(Collection<StatementPattern> nodes) { - - Set<String> bindings = null; - boolean contextSet = false; - Var context = null; - - if(nodes.size() < 2) { - return false; - } - - for(StatementPattern sp: nodes) { - - Var tempContext = sp.getContextVar(); - Var predVar = sp.getPredicateVar(); - - //does not support variable context - if(tempContext != null && !tempContext.isConstant()) { - return false; - } - if(!contextSet) { - context = tempContext; - contextSet = true; - } else { - - if(context == null && tempContext != null) { - return false; - } else if (context != null && !context.equals(tempContext)) { - return false; - } - } - - if(!predVar.isConstant()) { - return false; - } - - if(bindings == null ) { - bindings = sp.getBindingNames(); - if(bindings.size() == 0) { - return false; - } - } else { - bindings = Sets.intersection(bindings, sp.getBindingNames()); - if(bindings.size() == 0) { - return false; - } - } - - } - - - return isBindingsetValid(bindings); - } - - - - - -// private static Set<String> getSpVariables(StatementPattern sp) { -// -// Set<String> variables = Sets.newHashSet(); -// List<Var> varList = sp.getVarList(); -// -// for(Var v: varList) { -// if(!v.isConstant()) { -// variables.add(v.getName()); -// } -// } -// -// return variables; -// -// } -// - - - - - - public String toString() { - - String s = "Term conditions: " + "\n"; - - for(int i = 0; i < this.nodeColumnCond.length; i++) { - s = s + nodeColumnCond[i].toString() + "\n"; - } - - s = s + "Common Var: " + this.commonVar.toString() + "\n"; - s = s + "Context: " + this.contextURI; - - return s; - - } - - - - - - -}
