http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/org/apache/rya/indexing/TemporalTupleSet.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/TemporalTupleSet.java b/extras/indexing/src/main/java/org/apache/rya/indexing/TemporalTupleSet.java new file mode 100644 index 0000000..1677fed --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/TemporalTupleSet.java @@ -0,0 +1,287 @@ +package mvm.rya.indexing; + +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.joda.time.DateTime; +import org.openrdf.model.Statement; +import org.openrdf.model.URI; +import org.openrdf.model.impl.URIImpl; +import org.openrdf.query.BindingSet; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.algebra.QueryModelVisitor; + +import com.google.common.base.Joiner; +import com.google.common.collect.Maps; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import info.aduna.iteration.CloseableIteration; +import mvm.rya.indexing.external.tupleSet.ExternalTupleSet; + +//Indexing Node for temporal expressions to be inserted into execution plan +//to delegate temporal portion of query to temporal index +public class TemporalTupleSet extends ExternalTupleSet { + + private final Configuration conf; + private final TemporalIndexer temporalIndexer; + private final IndexingExpr filterInfo; + + public TemporalTupleSet(final IndexingExpr filterInfo, final TemporalIndexer temporalIndexer) { + this.filterInfo = filterInfo; + this.temporalIndexer = temporalIndexer; + conf = temporalIndexer.getConf(); + } + + /** + * {@inheritDoc} + */ + @Override + public Set<String> getBindingNames() { + return filterInfo.getBindingNames(); + } + + /** + * {@inheritDoc} + * <p> + * Note that we need a deep copy for everything that (during optimizations) + * can be altered via {@link #visitChildren(QueryModelVisitor)} + */ + @Override + public TemporalTupleSet clone() { + return new TemporalTupleSet(filterInfo, temporalIndexer); + } + + @Override + public double cardinality() { + return 0.0; // No idea how the estimate cardinality here. + } + + @Override + public String getSignature() { + + return "(TemporalTuple Projection) " + "variables: " + Joiner.on(", ").join(getBindingNames()).replaceAll("\\s+", " "); + } + + @Override + public boolean equals(final Object other) { + if (other == this) { + return true; + } + if (!(other instanceof TemporalTupleSet)) { + return false; + } + final TemporalTupleSet arg = (TemporalTupleSet) other; + return filterInfo.equals(arg.filterInfo); + } + + @Override + public int hashCode() { + int result = 17; + result = 31*result + filterInfo.hashCode(); + + return result; + } + + /** + * Returns an iterator over the result set associated with contained IndexingExpr. + * <p> + * Should be thread-safe (concurrent invocation {@link OfflineIterable} this + * method can be expected with some query evaluators. + */ + @Override + public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(final BindingSet bindings) + throws QueryEvaluationException { + final URI funcURI = filterInfo.getFunction(); + final SearchFunction searchFunction = (new TemporalSearchFunctionFactory(conf)).getSearchFunction(funcURI); + + if(filterInfo.getArguments().length > 1) { + throw new IllegalArgumentException("Index functions do not support more than two arguments."); + } + + final String queryText = filterInfo.getArguments()[0].stringValue(); + return IteratorFactory.getIterator(filterInfo.getSpConstraint(), bindings, queryText, searchFunction); + } + + //returns appropriate search function for a given URI + //search functions used by TemporalIndexer to query Temporal Index + private class TemporalSearchFunctionFactory { + private final Map<URI, SearchFunction> SEARCH_FUNCTION_MAP = Maps.newHashMap(); + Configuration conf; + + public TemporalSearchFunctionFactory(final Configuration conf) { + this.conf = conf; + } + + /** + * Get a {@link TemporalSearchFunction} for a give URI. + * + * @param searchFunction + * @return + */ + public SearchFunction getSearchFunction(final URI searchFunction) { + SearchFunction geoFunc = null; + try { + geoFunc = getSearchFunctionInternal(searchFunction); + } catch (final QueryEvaluationException e) { + e.printStackTrace(); + } + + return geoFunc; + } + + private SearchFunction getSearchFunctionInternal(final URI searchFunction) throws QueryEvaluationException { + final SearchFunction sf = SEARCH_FUNCTION_MAP.get(searchFunction); + + if (sf != null) { + return sf; + } else { + throw new QueryEvaluationException("Unknown Search Function: " + searchFunction.stringValue()); + } + } + + private final SearchFunction TEMPORAL_InstantAfterInstant = new SearchFunction() { + @Override + public CloseableIteration<Statement, QueryEvaluationException> performSearch(final String searchTerms, + final StatementConstraints contraints) throws QueryEvaluationException { + final TemporalInstant queryInstant = new TemporalInstantRfc3339(DateTime.parse(searchTerms)); + return temporalIndexer.queryInstantAfterInstant(queryInstant, contraints); + } + + @Override + public String toString() { + return "TEMPORAL_InstantAfterInstant"; + }; + }; + private final SearchFunction TEMPORAL_InstantBeforeInstant = new SearchFunction() { + @Override + public CloseableIteration<Statement, QueryEvaluationException> performSearch(final String searchTerms, + final StatementConstraints contraints) throws QueryEvaluationException { + final TemporalInstant queryInstant = new TemporalInstantRfc3339(DateTime.parse(searchTerms)); + return temporalIndexer.queryInstantBeforeInstant(queryInstant, contraints); + } + + @Override + public String toString() { + return "TEMPORAL_InstantBeforeInstant"; + }; + }; + + private final SearchFunction TEMPORAL_InstantEqualsInstant = new SearchFunction() { + @Override + public CloseableIteration<Statement, QueryEvaluationException> performSearch(final String searchTerms, + final StatementConstraints contraints) throws QueryEvaluationException { + final TemporalInstant queryInstant = new TemporalInstantRfc3339(DateTime.parse(searchTerms)); + return temporalIndexer.queryInstantEqualsInstant(queryInstant, contraints); + } + + @Override + public String toString() { + return "TEMPORAL_InstantEqualsInstant"; + }; + }; + + private final SearchFunction TEMPORAL_InstantAfterInterval = new SearchFunction() { + @Override + public CloseableIteration<Statement, QueryEvaluationException> performSearch(final String searchTerms, + final StatementConstraints contraints) throws QueryEvaluationException { + final TemporalInterval queryInterval = TemporalInstantRfc3339.parseInterval(searchTerms); + return temporalIndexer.queryInstantAfterInterval(queryInterval, contraints); + } + + @Override + public String toString() { + return "TEMPORAL_InstantAfterInterval"; + }; + }; + + private final SearchFunction TEMPORAL_InstantBeforeInterval = new SearchFunction() { + @Override + public CloseableIteration<Statement, QueryEvaluationException> performSearch(final String searchTerms, + final StatementConstraints contraints) throws QueryEvaluationException { + final TemporalInterval queryInterval = TemporalInstantRfc3339.parseInterval(searchTerms); + return temporalIndexer.queryInstantBeforeInterval(queryInterval, contraints); + } + + @Override + public String toString() { + return "TEMPORAL_InstantBeforeInterval"; + }; + }; + + private final SearchFunction TEMPORAL_InstantInsideInterval = new SearchFunction() { + @Override + public CloseableIteration<Statement, QueryEvaluationException> performSearch(final String searchTerms, + final StatementConstraints contraints) throws QueryEvaluationException { + final TemporalInterval queryInterval = TemporalInstantRfc3339.parseInterval(searchTerms); + return temporalIndexer.queryInstantInsideInterval(queryInterval, contraints); + } + + @Override + public String toString() { + return "TEMPORAL_InstantInsideInterval"; + }; + }; + + private final SearchFunction TEMPORAL_InstantHasBeginningInterval = new SearchFunction() { + @Override + public CloseableIteration<Statement, QueryEvaluationException> performSearch(final String searchTerms, + final StatementConstraints contraints) throws QueryEvaluationException { + final TemporalInterval queryInterval = TemporalInstantRfc3339.parseInterval(searchTerms); + return temporalIndexer.queryInstantHasBeginningInterval(queryInterval, contraints); + } + + @Override + public String toString() { + return "TEMPORAL_InstantHasBeginningInterval"; + }; + }; + + private final SearchFunction TEMPORAL_InstantHasEndInterval = new SearchFunction() { + @Override + public CloseableIteration<Statement, QueryEvaluationException> performSearch(final String searchTerms, + final StatementConstraints contraints) throws QueryEvaluationException { + final TemporalInterval queryInterval = TemporalInstantRfc3339.parseInterval(searchTerms); + return temporalIndexer.queryInstantHasEndInterval(queryInterval, contraints); + } + + @Override + public String toString() { + return "TEMPORAL_InstantHasEndInterval"; + }; + }; + + { + final String TEMPORAL_NS = "tag:rya-rdf.org,2015:temporal#"; + + SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"after"), TEMPORAL_InstantAfterInstant); + SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"before"), TEMPORAL_InstantBeforeInstant); + SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"equals"), TEMPORAL_InstantEqualsInstant); + + SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"beforeInterval"), TEMPORAL_InstantBeforeInterval); + SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"afterInterval"), TEMPORAL_InstantAfterInterval); + SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"insideInterval"), TEMPORAL_InstantInsideInterval); + SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"hasBeginningInterval"), + TEMPORAL_InstantHasBeginningInterval); + SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"hasEndInterval"), TEMPORAL_InstantHasEndInterval); + } + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java new file mode 100644 index 0000000..7c608de --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java @@ -0,0 +1,418 @@ +package mvm.rya.indexing.accumulo; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import static java.util.Objects.requireNonNull; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchScanner; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.MultiTableBatchWriter; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.log4j.Logger; +import org.openrdf.model.URI; +import org.openrdf.model.impl.URIImpl; + +import com.google.common.base.Optional; +import com.google.common.collect.Lists; + +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.api.RdfCloudTripleStoreConfiguration; +import mvm.rya.api.instance.RyaDetails; +import mvm.rya.indexing.FilterFunctionOptimizer; +import mvm.rya.indexing.accumulo.entity.EntityCentricIndex; +import mvm.rya.indexing.accumulo.entity.EntityOptimizer; +import mvm.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer; +import mvm.rya.indexing.accumulo.freetext.LuceneTokenizer; +import mvm.rya.indexing.accumulo.freetext.Tokenizer; +import mvm.rya.indexing.accumulo.temporal.AccumuloTemporalIndexer; +import mvm.rya.indexing.external.PrecomputedJoinIndexer; +import mvm.rya.indexing.mongodb.freetext.MongoFreeTextIndexer; +import mvm.rya.indexing.pcj.matching.PCJOptimizer; + +/** + * A set of configuration utils to read a Hadoop {@link Configuration} object and create Cloudbase/Accumulo objects. + * Soon will deprecate this class. Use installer for the set methods, use {@link RyaDetails} for the get methods. + * New code must separate parameters that are set at Rya install time from that which is specific to the client. + * Also Accumulo index tables are pushed down to the implementation and not configured in conf. + */ +public class ConfigUtils { + private static final Logger logger = Logger.getLogger(ConfigUtils.class); + + public static final String CLOUDBASE_TBL_PREFIX = "sc.cloudbase.tableprefix"; + public static final String CLOUDBASE_AUTHS = "sc.cloudbase.authorizations"; + public static final String CLOUDBASE_INSTANCE = "sc.cloudbase.instancename"; + public static final String CLOUDBASE_ZOOKEEPERS = "sc.cloudbase.zookeepers"; + public static final String CLOUDBASE_USER = "sc.cloudbase.username"; + public static final String CLOUDBASE_PASSWORD = "sc.cloudbase.password"; + + public static final String CLOUDBASE_WRITER_MAX_WRITE_THREADS = "sc.cloudbase.writer.maxwritethreads"; + public static final String CLOUDBASE_WRITER_MAX_LATENCY = "sc.cloudbase.writer.maxlatency"; + public static final String CLOUDBASE_WRITER_MAX_MEMORY = "sc.cloudbase.writer.maxmemory"; + + public static final String FREE_TEXT_QUERY_TERM_LIMIT = "sc.freetext.querytermlimit"; + + public static final String USE_FREETEXT = "sc.use_freetext"; + public static final String USE_TEMPORAL = "sc.use_temporal"; + public static final String USE_ENTITY = "sc.use_entity"; + public static final String USE_PCJ = "sc.use_pcj"; + public static final String USE_OPTIMAL_PCJ = "sc.use.optimal.pcj"; + public static final String USE_PCJ_UPDATER_INDEX = "sc.use.updater"; + + public static final String FLUO_APP_NAME = "rya.indexing.pcj.fluo.fluoAppName"; + public static final String USE_PCJ_FLUO_UPDATER = "rya.indexing.pcj.updater.fluo"; + public static final String PCJ_STORAGE_TYPE = "rya.indexing.pcj.storageType"; + public static final String PCJ_UPDATER_TYPE = "rya.indexing.pcj.updaterType"; + + + public static final String USE_INDEXING_SAIL = "sc.use.indexing.sail"; + public static final String USE_EXTERNAL_SAIL = "sc.use.external.sail"; + + public static final String USE_MOCK_INSTANCE = ".useMockInstance"; + + public static final String NUM_PARTITIONS = "sc.cloudbase.numPartitions"; + + private static final int WRITER_MAX_WRITE_THREADS = 1; + private static final long WRITER_MAX_LATNECY = Long.MAX_VALUE; + private static final long WRITER_MAX_MEMORY = 10000L; + + public static final String DISPLAY_QUERY_PLAN = "query.printqueryplan"; + + public static final String FREETEXT_PREDICATES_LIST = "sc.freetext.predicates"; + public static final String FREETEXT_DOC_NUM_PARTITIONS = "sc.freetext.numPartitions.text"; + public static final String FREETEXT_TERM_NUM_PARTITIONS = "sc.freetext.numPartitions.term"; + + public static final String TOKENIZER_CLASS = "sc.freetext.tokenizer.class"; + + public static final String GEO_PREDICATES_LIST = "sc.geo.predicates"; + + public static final String TEMPORAL_PREDICATES_LIST = "sc.temporal.predicates"; + + public static final String USE_MONGO = "sc.useMongo"; + + public static boolean isDisplayQueryPlan(final Configuration conf){ + return conf.getBoolean(DISPLAY_QUERY_PLAN, false); + } + + /** + * get a value from the configuration file and throw an exception if the value does not exist. + * + * @param conf + * @param key + * @return + */ + private static String getStringCheckSet(final Configuration conf, final String key) { + final String value = conf.get(key); + requireNonNull(value, key + " not set"); + return value; + } + + /** + * @param conf + * @param tablename + * @return if the table was created + * @throws AccumuloException + * @throws AccumuloSecurityException + * @throws TableExistsException + */ + public static boolean createTableIfNotExists(final Configuration conf, final String tablename) throws AccumuloException, AccumuloSecurityException, + TableExistsException { + final TableOperations tops = getConnector(conf).tableOperations(); + if (!tops.exists(tablename)) { + logger.info("Creating table: " + tablename); + tops.create(tablename); + return true; + } + return false; + } + + /** + * Lookup the table name prefix in the conf and throw an error if it is null. + * Future, get table prefix from RyaDetails -- the Rya instance name + * -- also getting info from the RyaDetails should happen within RyaSailFactory and not ConfigUtils. + * @param conf Rya configuration map where it extracts the prefix (instance name) + * @return index table prefix corresponding to this Rya instance + */ + public static String getTablePrefix(final Configuration conf) { + final String tablePrefix; + tablePrefix = conf.get(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX); + requireNonNull(tablePrefix, "Configuration key: " + RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX + + " not set. Cannot generate table name."); + return tablePrefix; + } + + public static int getFreeTextTermLimit(final Configuration conf) { + return conf.getInt(FREE_TEXT_QUERY_TERM_LIMIT, 100); + } + + public static Set<URI> getFreeTextPredicates(final Configuration conf) { + return getPredicates(conf, FREETEXT_PREDICATES_LIST); + } + + public static Set<URI> getGeoPredicates(final Configuration conf) { + return getPredicates(conf, GEO_PREDICATES_LIST); + } + /** + * Used for indexing statements about date & time instances and intervals. + * @param conf + * @return Set of predicate URI's whose objects should be date time literals. + */ + public static Set<URI> getTemporalPredicates(final Configuration conf) { + return getPredicates(conf, TEMPORAL_PREDICATES_LIST); + } + + protected static Set<URI> getPredicates(final Configuration conf, final String confName) { + final String[] validPredicateStrings = conf.getStrings(confName, new String[] {}); + final Set<URI> predicates = new HashSet<URI>(); + for (final String prediateString : validPredicateStrings) { + predicates.add(new URIImpl(prediateString)); + } + return predicates; + } + + public static Tokenizer getFreeTextTokenizer(final Configuration conf) { + final Class<? extends Tokenizer> c = conf.getClass(TOKENIZER_CLASS, LuceneTokenizer.class, Tokenizer.class); + return ReflectionUtils.newInstance(c, conf); + } + + public static BatchWriter createDefaultBatchWriter(final String tablename, final Configuration conf) throws TableNotFoundException, + AccumuloException, AccumuloSecurityException { + final Long DEFAULT_MAX_MEMORY = getWriterMaxMemory(conf); + final Long DEFAULT_MAX_LATENCY = getWriterMaxLatency(conf); + final Integer DEFAULT_MAX_WRITE_THREADS = getWriterMaxWriteThreads(conf); + final Connector connector = ConfigUtils.getConnector(conf); + return connector.createBatchWriter(tablename, DEFAULT_MAX_MEMORY, DEFAULT_MAX_LATENCY, DEFAULT_MAX_WRITE_THREADS); + } + + public static MultiTableBatchWriter createMultitableBatchWriter(final Configuration conf) throws AccumuloException, AccumuloSecurityException { + final Long DEFAULT_MAX_MEMORY = getWriterMaxMemory(conf); + final Long DEFAULT_MAX_LATENCY = getWriterMaxLatency(conf); + final Integer DEFAULT_MAX_WRITE_THREADS = getWriterMaxWriteThreads(conf); + final Connector connector = ConfigUtils.getConnector(conf); + return connector.createMultiTableBatchWriter(DEFAULT_MAX_MEMORY, DEFAULT_MAX_LATENCY, DEFAULT_MAX_WRITE_THREADS); + } + + public static Scanner createScanner(final String tablename, final Configuration conf) throws AccumuloException, AccumuloSecurityException, + TableNotFoundException { + final Connector connector = ConfigUtils.getConnector(conf); + final Authorizations auths = ConfigUtils.getAuthorizations(conf); + return connector.createScanner(tablename, auths); + + } + + public static BatchScanner createBatchScanner(final String tablename, final Configuration conf) throws AccumuloException, AccumuloSecurityException, + TableNotFoundException { + final Connector connector = ConfigUtils.getConnector(conf); + final Authorizations auths = ConfigUtils.getAuthorizations(conf); + Integer numThreads = null; + if (conf instanceof RdfCloudTripleStoreConfiguration) { + numThreads = ((RdfCloudTripleStoreConfiguration) conf).getNumThreads(); + } else { + numThreads = conf.getInt(RdfCloudTripleStoreConfiguration.CONF_NUM_THREADS, 2); + } + return connector.createBatchScanner(tablename, auths, numThreads); + } + + public static int getWriterMaxWriteThreads(final Configuration conf) { + return conf.getInt(CLOUDBASE_WRITER_MAX_WRITE_THREADS, WRITER_MAX_WRITE_THREADS); + } + + public static long getWriterMaxLatency(final Configuration conf) { + return conf.getLong(CLOUDBASE_WRITER_MAX_LATENCY, WRITER_MAX_LATNECY); + } + + public static long getWriterMaxMemory(final Configuration conf) { + return conf.getLong(CLOUDBASE_WRITER_MAX_MEMORY, WRITER_MAX_MEMORY); + } + + public static String getUsername(final JobContext job) { + return getUsername(job.getConfiguration()); + } + + public static String getUsername(final Configuration conf) { + return conf.get(CLOUDBASE_USER); + } + + public static Authorizations getAuthorizations(final JobContext job) { + return getAuthorizations(job.getConfiguration()); + } + + public static Authorizations getAuthorizations(final Configuration conf) { + final String authString = conf.get(CLOUDBASE_AUTHS, ""); + if (authString.isEmpty()) { + return new Authorizations(); + } + return new Authorizations(authString.split(",")); + } + + public static Instance getInstance(final JobContext job) { + return getInstance(job.getConfiguration()); + } + + public static Instance getInstance(final Configuration conf) { + if (useMockInstance(conf)) { + return new MockInstance(conf.get(CLOUDBASE_INSTANCE)); + } + return new ZooKeeperInstance(conf.get(CLOUDBASE_INSTANCE), conf.get(CLOUDBASE_ZOOKEEPERS)); + } + + public static String getPassword(final JobContext job) { + return getPassword(job.getConfiguration()); + } + + public static String getPassword(final Configuration conf) { + return conf.get(CLOUDBASE_PASSWORD, ""); + } + + public static Connector getConnector(final JobContext job) throws AccumuloException, AccumuloSecurityException { + return getConnector(job.getConfiguration()); + } + + public static Connector getConnector(final Configuration conf) throws AccumuloException, AccumuloSecurityException { + final Instance instance = ConfigUtils.getInstance(conf); + + return instance.getConnector(getUsername(conf), getPassword(conf)); + } + + public static boolean useMockInstance(final Configuration conf) { + return conf.getBoolean(USE_MOCK_INSTANCE, false); + } + + protected static int getNumPartitions(final Configuration conf) { + return conf.getInt(NUM_PARTITIONS, 25); + } + + public static int getFreeTextDocNumPartitions(final Configuration conf) { + return conf.getInt(FREETEXT_DOC_NUM_PARTITIONS, getNumPartitions(conf)); + } + + public static int getFreeTextTermNumPartitions(final Configuration conf) { + return conf.getInt(FREETEXT_TERM_NUM_PARTITIONS, getNumPartitions(conf)); + } + + public static boolean getUseFreeText(final Configuration conf) { + return conf.getBoolean(USE_FREETEXT, false); + } + + public static boolean getUseTemporal(final Configuration conf) { + return conf.getBoolean(USE_TEMPORAL, false); + } + + public static boolean getUseEntity(final Configuration conf) { + return conf.getBoolean(USE_ENTITY, false); + } + + public static boolean getUsePCJ(final Configuration conf) { + return conf.getBoolean(USE_PCJ, false); + } + + public static boolean getUseOptimalPCJ(final Configuration conf) { + return conf.getBoolean(USE_OPTIMAL_PCJ, false); + } + + public static boolean getUsePcjUpdaterIndex(final Configuration conf) { + return conf.getBoolean(USE_PCJ_UPDATER_INDEX, false); + } + + + /** + * @return The name of the Fluo Application this instance of RYA is + * using to incrementally update PCJs. + */ + //TODO delete this eventually and use Details table + public Optional<String> getFluoAppName(Configuration conf) { + return Optional.fromNullable(conf.get(FLUO_APP_NAME)); + } + + + public static boolean getUseMongo(final Configuration conf) { + return conf.getBoolean(USE_MONGO, false); + } + + + public static void setIndexers(final RdfCloudTripleStoreConfiguration conf) { + + final List<String> indexList = Lists.newArrayList(); + final List<String> optimizers = Lists.newArrayList(); + + boolean useFilterIndex = false; + + if (ConfigUtils.getUseMongo(conf)) { + if (getUseFreeText(conf)) { + indexList.add(MongoFreeTextIndexer.class.getName()); + useFilterIndex = true; + } + } else { + + if (getUsePCJ(conf) || getUseOptimalPCJ(conf)) { + conf.setPcjOptimizer(PCJOptimizer.class); + } + + if(getUsePcjUpdaterIndex(conf)) { + indexList.add(PrecomputedJoinIndexer.class.getName()); + } + + + if (getUseFreeText(conf)) { + indexList.add(AccumuloFreeTextIndexer.class.getName()); + useFilterIndex = true; + } + + if (getUseTemporal(conf)) { + indexList.add(AccumuloTemporalIndexer.class.getName()); + useFilterIndex = true; + } + + } + + if (useFilterIndex) { + optimizers.add(FilterFunctionOptimizer.class.getName()); + } + + if (getUseEntity(conf)) { + indexList.add(EntityCentricIndex.class.getName()); + optimizers.add(EntityOptimizer.class.getName()); + + } + + conf.setStrings(AccumuloRdfConfiguration.CONF_ADDITIONAL_INDEXERS, indexList.toArray(new String[]{})); + conf.setStrings(RdfCloudTripleStoreConfiguration.CONF_OPTIMIZERS, optimizers.toArray(new String[]{})); + + } + + + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/AccumuloDocIdIndexer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/AccumuloDocIdIndexer.java b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/AccumuloDocIdIndexer.java new file mode 100644 index 0000000..4c1a3ad --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/AccumuloDocIdIndexer.java @@ -0,0 +1,443 @@ +package mvm.rya.indexing.accumulo.entity; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import static mvm.rya.api.RdfCloudTripleStoreConstants.DELIM_BYTE; +import static mvm.rya.api.RdfCloudTripleStoreConstants.TYPE_DELIM_BYTE; +import info.aduna.iteration.CloseableIteration; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; +import java.util.NoSuchElementException; +import java.util.Set; + +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.accumulo.documentIndex.DocIndexIteratorUtil; +import mvm.rya.accumulo.documentIndex.DocumentIndexIntersectingIterator; +import mvm.rya.api.RdfCloudTripleStoreConfiguration; +import mvm.rya.api.domain.RyaURI; +import mvm.rya.api.resolver.RyaContext; +import mvm.rya.api.resolver.RyaToRdfConversions; +import mvm.rya.api.resolver.RyaTypeResolverException; +import mvm.rya.indexing.DocIdIndexer; +import mvm.rya.indexing.accumulo.ConfigUtils; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchScanner; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.hadoop.io.Text; +import org.openrdf.query.BindingSet; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.algebra.StatementPattern; +import org.openrdf.query.algebra.TupleExpr; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; +import org.openrdf.query.algebra.helpers.StatementPatternCollector; +import org.openrdf.query.parser.ParsedQuery; +import org.openrdf.query.parser.sparql.SPARQLParser; + +import com.google.common.base.Preconditions; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Sets; +import com.google.common.primitives.Bytes; + +public class AccumuloDocIdIndexer implements DocIdIndexer { + + + + private BatchScanner bs; + private AccumuloRdfConfiguration conf; + + public AccumuloDocIdIndexer(RdfCloudTripleStoreConfiguration conf) throws AccumuloException, AccumuloSecurityException { + Preconditions.checkArgument(conf instanceof RdfCloudTripleStoreConfiguration, "conf must be isntance of RdfCloudTripleStoreConfiguration"); + this.conf = (AccumuloRdfConfiguration) conf; + //Connector conn = ConfigUtils.getConnector(conf); + } + + + + + public CloseableIteration<BindingSet, QueryEvaluationException> queryDocIndex(String sparqlQuery, + Collection<BindingSet> constraints) throws TableNotFoundException, QueryEvaluationException { + + SPARQLParser parser = new SPARQLParser(); + ParsedQuery pq1 = null; + try { + pq1 = parser.parseQuery(sparqlQuery, null); + } catch (MalformedQueryException e) { + e.printStackTrace(); + } + + TupleExpr te1 = pq1.getTupleExpr(); + List<StatementPattern> spList1 = StatementPatternCollector.process(te1); + + if(StarQuery.isValidStarQuery(spList1)) { + StarQuery sq1 = new StarQuery(spList1); + return queryDocIndex(sq1, constraints); + } else { + throw new IllegalArgumentException("Invalid star query!"); + } + + } + + + + + @Override + public CloseableIteration<BindingSet, QueryEvaluationException> queryDocIndex(StarQuery query, + Collection<BindingSet> constraints) throws TableNotFoundException, QueryEvaluationException { + + final StarQuery starQ = query; + final Iterator<BindingSet> bs = constraints.iterator(); + final Iterator<BindingSet> bs2 = constraints.iterator(); + final Set<String> unCommonVarNames; + final Set<String> commonVarNames; + if (bs2.hasNext()) { + BindingSet currBs = bs2.next(); + commonVarNames = StarQuery.getCommonVars(query, currBs); + unCommonVarNames = Sets.difference(currBs.getBindingNames(), commonVarNames); + } else { + commonVarNames = Sets.newHashSet(); + unCommonVarNames = Sets.newHashSet(); + } + + if( commonVarNames.size() == 1 && !query.commonVarConstant() && commonVarNames.contains(query.getCommonVarName())) { + + final HashMultimap<String, BindingSet> map = HashMultimap.create(); + final String commonVar = starQ.getCommonVarName(); + final Iterator<Entry<Key, Value>> intersections; + final BatchScanner scan; + Set<Range> ranges = Sets.newHashSet(); + + while(bs.hasNext()) { + + BindingSet currentBs = bs.next(); + + if(currentBs.getBinding(commonVar) == null) { + continue; + } + + String row = currentBs.getBinding(commonVar).getValue().stringValue(); + ranges.add(new Range(row)); + map.put(row, currentBs); + + } + scan = runQuery(starQ, ranges); + intersections = scan.iterator(); + + + return new CloseableIteration<BindingSet, QueryEvaluationException>() { + + + private QueryBindingSet currentSolutionBs = null; + private boolean hasNextCalled = false; + private boolean isEmpty = false; + private Iterator<BindingSet> inputSet = new ArrayList<BindingSet>().iterator(); + private BindingSet currentBs; + private Key key; + + + + @Override + public boolean hasNext() throws QueryEvaluationException { + if (!hasNextCalled && !isEmpty) { + while (inputSet.hasNext() || intersections.hasNext()) { + if (!inputSet.hasNext()) { + key = intersections.next().getKey(); + inputSet = map.get(key.getRow().toString()).iterator(); + } + currentBs = inputSet.next(); + currentSolutionBs = deserializeKey(key, starQ, currentBs, unCommonVarNames); + + if (currentSolutionBs.size() == unCommonVarNames.size() + starQ.getUnCommonVars().size() +1) { + hasNextCalled = true; + return true; + } + + } + + isEmpty = true; + return false; + + } else if (isEmpty) { + return false; + } else { + return true; + } + + } + + + @Override + public BindingSet next() throws QueryEvaluationException { + + if (hasNextCalled) { + hasNextCalled = false; + } else if (isEmpty) { + throw new NoSuchElementException(); + } else { + if (this.hasNext()) { + hasNextCalled = false; + } else { + throw new NoSuchElementException(); + } + } + + return currentSolutionBs; + } + + @Override + public void remove() throws QueryEvaluationException { + throw new UnsupportedOperationException(); + } + + @Override + public void close() throws QueryEvaluationException { + scan.close(); + } + + }; + + + } else { + + return new CloseableIteration<BindingSet, QueryEvaluationException>() { + + @Override + public void remove() throws QueryEvaluationException { + throw new UnsupportedOperationException(); + } + + private Iterator<Entry<Key, Value>> intersections = null; + private QueryBindingSet currentSolutionBs = null; + private boolean hasNextCalled = false; + private boolean isEmpty = false; + private boolean init = false; + private BindingSet currentBs; + private StarQuery sq = new StarQuery(starQ); + private Set<Range> emptyRangeSet = Sets.newHashSet(); + private BatchScanner scan; + + @Override + public BindingSet next() throws QueryEvaluationException { + if (hasNextCalled) { + hasNextCalled = false; + } else if (isEmpty) { + throw new NoSuchElementException(); + } else { + if (this.hasNext()) { + hasNextCalled = false; + } else { + throw new NoSuchElementException(); + } + } + return currentSolutionBs; + } + + @Override + public boolean hasNext() throws QueryEvaluationException { + + if (!init) { + if (intersections == null && bs.hasNext()) { + currentBs = bs.next(); + sq = StarQuery.getConstrainedStarQuery(sq, currentBs); + scan = runQuery(sq,emptyRangeSet); + intersections = scan.iterator(); + // binding set empty + } else if (intersections == null && !bs.hasNext()) { + currentBs = new QueryBindingSet(); + scan = runQuery(starQ,emptyRangeSet); + intersections = scan.iterator(); + } + + init = true; + } + + if (!hasNextCalled && !isEmpty) { + while (intersections.hasNext() || bs.hasNext()) { + if (!intersections.hasNext()) { + scan.close(); + currentBs = bs.next(); + sq = StarQuery.getConstrainedStarQuery(sq, currentBs); + scan = runQuery(sq,emptyRangeSet); + intersections = scan.iterator(); + } + if (intersections.hasNext()) { + currentSolutionBs = deserializeKey(intersections.next().getKey(), sq, currentBs, + unCommonVarNames); + } else { + continue; + } + + if (sq.commonVarConstant() && currentSolutionBs.size() == unCommonVarNames.size() + sq.getUnCommonVars().size()) { + hasNextCalled = true; + return true; + } else if(currentSolutionBs.size() == unCommonVarNames.size() + sq.getUnCommonVars().size() + 1) { + hasNextCalled = true; + return true; + } + } + + isEmpty = true; + return false; + + } else if (isEmpty) { + return false; + } else { + return true; + } + } + + @Override + public void close() throws QueryEvaluationException { + scan.close(); + } + }; + } + } + + private QueryBindingSet deserializeKey(Key key, StarQuery sq, BindingSet currentBs, Set<String> unCommonVar) { + + + QueryBindingSet currentSolutionBs = new QueryBindingSet(); + + Text row = key.getRow(); + Text cq = key.getColumnQualifier(); + + + String[] cqArray = cq.toString().split(DocIndexIteratorUtil.DOC_ID_INDEX_DELIM); + + boolean commonVarSet = false; + + //if common Var is constant there is no common variable to assign a value to + if(sq.commonVarConstant()) { + commonVarSet = true; + } + + if (!commonVarSet && sq.isCommonVarURI()) { + RyaURI rURI = new RyaURI(row.toString()); + currentSolutionBs.addBinding(sq.getCommonVarName(), + RyaToRdfConversions.convertValue(rURI)); + commonVarSet = true; + } + + for (String s : sq.getUnCommonVars()) { + + byte[] cqBytes = cqArray[sq.getVarPos().get(s)].getBytes(); + int firstIndex = Bytes.indexOf(cqBytes, DELIM_BYTE); + int secondIndex = Bytes.lastIndexOf(cqBytes, DELIM_BYTE); + int typeIndex = Bytes.indexOf(cqBytes, TYPE_DELIM_BYTE); + byte[] tripleComponent = Arrays.copyOfRange(cqBytes, firstIndex + 1, secondIndex); + byte[] cqContent = Arrays.copyOfRange(cqBytes, secondIndex + 1, typeIndex); + byte[] objType = Arrays.copyOfRange(cqBytes, typeIndex, cqBytes.length); + + if (new String(tripleComponent).equals("object")) { + byte[] object = Bytes.concat(cqContent, objType); + org.openrdf.model.Value v = null; + try { + v = RyaToRdfConversions.convertValue(RyaContext.getInstance().deserialize( + object)); + } catch (RyaTypeResolverException e) { + e.printStackTrace(); + } + currentSolutionBs.addBinding(s, v); + + } else if (new String(tripleComponent).equals("subject")) { + if (!commonVarSet) { + byte[] object = Bytes.concat(row.getBytes(), objType); + org.openrdf.model.Value v = null; + try { + v = RyaToRdfConversions.convertValue(RyaContext.getInstance().deserialize( + object)); + } catch (RyaTypeResolverException e) { + e.printStackTrace(); + } + currentSolutionBs.addBinding(sq.getCommonVarName(), v); + commonVarSet = true; + } + RyaURI rURI = new RyaURI(new String(cqContent)); + currentSolutionBs.addBinding(s, RyaToRdfConversions.convertValue(rURI)); + } else { + throw new IllegalArgumentException("Invalid row."); + } + } + for (String s : unCommonVar) { + currentSolutionBs.addBinding(s, currentBs.getValue(s)); + } + return currentSolutionBs; + } + + private BatchScanner runQuery(StarQuery query, Collection<Range> ranges) throws QueryEvaluationException { + + try { + if (ranges.size() == 0) { + String rangeText = query.getCommonVarValue(); + Range r; + if (rangeText != null) { + r = new Range(new Text(query.getCommonVarValue())); + } else { + r = new Range(); + } + ranges = Collections.singleton(r); + } + + Connector accCon = ConfigUtils.getConnector(conf); + IteratorSetting is = new IteratorSetting(30, "fii", DocumentIndexIntersectingIterator.class); + + DocumentIndexIntersectingIterator.setColumnFamilies(is, query.getColumnCond()); + + if (query.hasContext()) { + DocumentIndexIntersectingIterator.setContext(is, query.getContextURI()); + } + bs = accCon.createBatchScanner(EntityCentricIndex.getTableName(conf), + new Authorizations(conf.get(ConfigUtils.CLOUDBASE_AUTHS)), 15); + bs.addScanIterator(is); + bs.setRanges(ranges); + + return bs; + + } catch (TableNotFoundException | AccumuloException | AccumuloSecurityException e) { + throw new QueryEvaluationException(e); + } + } + + @Override + public void close() throws IOException { + //TODO generate an exception when BS passed in -- scanner closed +// if (bs != null) { +// bs.close(); +// } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityCentricIndex.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityCentricIndex.java b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityCentricIndex.java new file mode 100644 index 0000000..9a9daa5 --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityCentricIndex.java @@ -0,0 +1,327 @@ +package mvm.rya.indexing.accumulo.entity; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import static mvm.rya.accumulo.AccumuloRdfConstants.EMPTY_CV; +import static mvm.rya.accumulo.AccumuloRdfConstants.EMPTY_VALUE; +import static mvm.rya.api.RdfCloudTripleStoreConstants.DELIM_BYTES; +import static mvm.rya.api.RdfCloudTripleStoreConstants.EMPTY_BYTES; +import static mvm.rya.api.RdfCloudTripleStoreConstants.EMPTY_TEXT; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Set; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.MultiTableBatchWriter; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; +import org.openrdf.model.URI; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.primitives.Bytes; + +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.accumulo.experimental.AbstractAccumuloIndexer; +import mvm.rya.api.RdfCloudTripleStoreConfiguration; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.domain.RyaType; +import mvm.rya.api.domain.RyaURI; +import mvm.rya.api.resolver.RyaContext; +import mvm.rya.api.resolver.RyaTypeResolverException; +import mvm.rya.api.resolver.triple.TripleRow; +import mvm.rya.indexing.accumulo.ConfigUtils; + +public class EntityCentricIndex extends AbstractAccumuloIndexer { + + private static final Logger logger = Logger.getLogger(EntityCentricIndex.class); + private static final String TABLE_SUFFIX = "EntityCentricIndex"; + + private AccumuloRdfConfiguration conf; + private BatchWriter writer; + private boolean isInit = false; + + private void initInternal() throws AccumuloException, AccumuloSecurityException, TableNotFoundException, IOException, + TableExistsException { + ConfigUtils.createTableIfNotExists(conf, getTableName()); + } + + @Override + public Configuration getConf() { + return conf; + } + + //initialization occurs in setConf because index is created using reflection + @Override + public void setConf(final Configuration conf) { + if (conf instanceof AccumuloRdfConfiguration) { + this.conf = (AccumuloRdfConfiguration) conf; + } else { + this.conf = new AccumuloRdfConfiguration(conf); + } + if (!isInit) { + try { + initInternal(); + isInit = true; + } catch (final AccumuloException e) { + logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); + throw new RuntimeException(e); + } catch (final AccumuloSecurityException e) { + logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); + throw new RuntimeException(e); + } catch (final TableNotFoundException e) { + logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); + throw new RuntimeException(e); + } catch (final TableExistsException e) { + logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); + throw new RuntimeException(e); + } catch (final IOException e) { + logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); + throw new RuntimeException(e); + } + } + } + + /** + * Get the Accumulo table used by this index. + * @return table used by instances of this index + */ + @Override + public String getTableName() { + return getTableName(conf); + } + + /** + * Get the Accumulo table that will be used by this index. + * @param conf + * @return table name guaranteed to be used by instances of this index + */ + public static String getTableName(Configuration conf) { + return ConfigUtils.getTablePrefix(conf) + TABLE_SUFFIX; + } + + @Override + public void setMultiTableBatchWriter(final MultiTableBatchWriter writer) throws IOException { + try { + this.writer = writer.getBatchWriter(getTableName()); + } catch (final AccumuloException e) { + throw new IOException(e); + } catch (final AccumuloSecurityException e) { + throw new IOException(e); + } catch (final TableNotFoundException e) { + throw new IOException(e); + } + } + + @Override + public void storeStatement(final RyaStatement stmt) throws IOException { + Preconditions.checkNotNull(writer, "BatchWriter not Set"); + try { + for (final TripleRow row : serializeStatement(stmt)) { + writer.addMutation(createMutation(row)); + } + } catch (final MutationsRejectedException e) { + throw new IOException(e); + } catch (final RyaTypeResolverException e) { + throw new IOException(e); + } + } + + @Override + public void deleteStatement(final RyaStatement stmt) throws IOException { + Preconditions.checkNotNull(writer, "BatchWriter not Set"); + try { + for (final TripleRow row : serializeStatement(stmt)) { + writer.addMutation(deleteMutation(row)); + } + } catch (final MutationsRejectedException e) { + throw new IOException(e); + } catch (final RyaTypeResolverException e) { + throw new IOException(e); + } + } + + protected Mutation deleteMutation(final TripleRow tripleRow) { + final Mutation m = new Mutation(new Text(tripleRow.getRow())); + + final byte[] columnFamily = tripleRow.getColumnFamily(); + final Text cfText = columnFamily == null ? EMPTY_TEXT : new Text(columnFamily); + + final byte[] columnQualifier = tripleRow.getColumnQualifier(); + final Text cqText = columnQualifier == null ? EMPTY_TEXT : new Text(columnQualifier); + + final byte[] columnVisibility = tripleRow.getColumnVisibility(); + final ColumnVisibility cv = columnVisibility == null ? EMPTY_CV : new ColumnVisibility(columnVisibility); + + m.putDelete(cfText, cqText, cv, tripleRow.getTimestamp()); + return m; + } + + public static Collection<Mutation> createMutations(final RyaStatement stmt) throws RyaTypeResolverException{ + final Collection<Mutation> m = Lists.newArrayList(); + for (final TripleRow tr : serializeStatement(stmt)){ + m.add(createMutation(tr)); + } + return m; + } + + private static Mutation createMutation(final TripleRow tripleRow) { + final Mutation mutation = new Mutation(new Text(tripleRow.getRow())); + final byte[] columnVisibility = tripleRow.getColumnVisibility(); + final ColumnVisibility cv = columnVisibility == null ? EMPTY_CV : new ColumnVisibility(columnVisibility); + final Long timestamp = tripleRow.getTimestamp(); + final byte[] value = tripleRow.getValue(); + final Value v = value == null ? EMPTY_VALUE : new Value(value); + final byte[] columnQualifier = tripleRow.getColumnQualifier(); + final Text cqText = columnQualifier == null ? EMPTY_TEXT : new Text(columnQualifier); + final byte[] columnFamily = tripleRow.getColumnFamily(); + final Text cfText = columnFamily == null ? EMPTY_TEXT : new Text(columnFamily); + + mutation.put(cfText, cqText, cv, timestamp, v); + return mutation; + } + + private static List<TripleRow> serializeStatement(final RyaStatement stmt) throws RyaTypeResolverException { + final RyaURI subject = stmt.getSubject(); + final RyaURI predicate = stmt.getPredicate(); + final RyaType object = stmt.getObject(); + final RyaURI context = stmt.getContext(); + final Long timestamp = stmt.getTimestamp(); + final byte[] columnVisibility = stmt.getColumnVisibility(); + final byte[] value = stmt.getValue(); + assert subject != null && predicate != null && object != null; + final byte[] cf = (context == null) ? EMPTY_BYTES : context.getData().getBytes(); + final byte[] subjBytes = subject.getData().getBytes(); + final byte[] predBytes = predicate.getData().getBytes(); + final byte[][] objBytes = RyaContext.getInstance().serializeType(object); + + return Lists.newArrayList(new TripleRow(subjBytes, + predBytes, + Bytes.concat(cf, DELIM_BYTES, + "object".getBytes(), DELIM_BYTES, + objBytes[0], objBytes[1]), + timestamp, + columnVisibility, + value), + new TripleRow(objBytes[0], + predBytes, + Bytes.concat(cf, DELIM_BYTES, + "subject".getBytes(), DELIM_BYTES, + subjBytes, objBytes[1]), + timestamp, + columnVisibility, + value)); + } + + /** + * Deserialize a row from the entity-centric index. + * @param key Row key, contains statement data + * @param value Row value + * @return The statement represented by the row + * @throws IOException if edge direction can't be extracted as expected. + * @throws RyaTypeResolverException if a type error occurs deserializing the statement's object. + */ + public static RyaStatement deserializeStatement(Key key, Value value) throws RyaTypeResolverException, IOException { + assert key != null; + assert value != null; + byte[] entityBytes = key.getRowData().toArray(); + byte[] predicateBytes = key.getColumnFamilyData().toArray(); + byte[] data = key.getColumnQualifierData().toArray(); + long timestamp = key.getTimestamp(); + byte[] columnVisibility = key.getColumnVisibilityData().toArray(); + byte[] valueBytes = value.get(); + + // main entity is either the subject or object + // data contains: column family , var name of other node , data of other node + datatype of object + int split = Bytes.indexOf(data, DELIM_BYTES); + byte[] columnFamily = Arrays.copyOf(data, split); + byte[] edgeBytes = Arrays.copyOfRange(data, split + DELIM_BYTES.length, data.length); + split = Bytes.indexOf(edgeBytes, DELIM_BYTES); + String otherNodeVar = new String(Arrays.copyOf(edgeBytes, split)); + byte[] otherNodeBytes = Arrays.copyOfRange(edgeBytes, split + DELIM_BYTES.length, edgeBytes.length - 2); + byte[] typeBytes = Arrays.copyOfRange(edgeBytes, edgeBytes.length - 2, edgeBytes.length); + byte[] objectBytes; + RyaURI subject; + RyaURI predicate = new RyaURI(new String(predicateBytes)); + RyaType object; + RyaURI context = null; + // Expect either: entity=subject.data, otherNodeVar="object", otherNodeBytes={object.data, object.datatype_marker} + // or: entity=object.data, otherNodeVar="subject", otherNodeBytes={subject.data, object.datatype_marker} + switch (otherNodeVar) { + case "subject": + subject = new RyaURI(new String(otherNodeBytes)); + objectBytes = Bytes.concat(entityBytes, typeBytes); + break; + case "object": + subject = new RyaURI(new String(entityBytes)); + objectBytes = Bytes.concat(otherNodeBytes, typeBytes); + break; + default: + throw new IOException("Failed to deserialize entity-centric index row. " + + "Expected 'subject' or 'object', encountered: '" + otherNodeVar + "'"); + } + object = RyaContext.getInstance().deserialize(objectBytes); + if (columnFamily != null && columnFamily.length > 0) { + context = new RyaURI(new String(columnFamily)); + } + return new RyaStatement(subject, predicate, object, context, + null, columnVisibility, valueBytes, timestamp); + } + + @Override + public void init() { + } + + @Override + public void setConnector(final Connector connector) { + } + + @Override + public void destroy() { + } + + @Override + public void purge(final RdfCloudTripleStoreConfiguration configuration) { + } + + @Override + public void dropAndDestroy() { + } + + @Override + public Set<URI> getIndexablePredicates() { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityLocalityGroupSetter.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityLocalityGroupSetter.java b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityLocalityGroupSetter.java new file mode 100644 index 0000000..2030e58 --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityLocalityGroupSetter.java @@ -0,0 +1,171 @@ +package mvm.rya.indexing.accumulo.entity; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.NoSuchElementException; +import java.util.Set; +import mvm.rya.indexing.accumulo.ConfigUtils; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchScanner; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; + +public class EntityLocalityGroupSetter { + + + String tablePrefix; + Connector conn; + Configuration conf; + + public EntityLocalityGroupSetter(String tablePrefix, Connector conn, Configuration conf) { + this.conn = conn; + this.tablePrefix = tablePrefix; + this.conf = conf; + } + + + + private Iterator<String> getPredicates() { + + String auths = conf.get(ConfigUtils.CLOUDBASE_AUTHS); + BatchScanner bs = null; + try { + bs = conn.createBatchScanner(tablePrefix + "prospects", new Authorizations(auths), 10); + } catch (TableNotFoundException e) { + e.printStackTrace(); + } + bs.setRanges(Collections.singleton(Range.prefix(new Text("predicate" + "\u0000")))); + final Iterator<Entry<Key,Value>> iter = bs.iterator(); + + return new Iterator<String>() { + + private String next = null; + private boolean hasNextCalled = false; + private boolean isEmpty = false; + + @Override + public boolean hasNext() { + + if (!hasNextCalled && !isEmpty) { + while (iter.hasNext()) { + Entry<Key,Value> temp = iter.next(); + String row = temp.getKey().getRow().toString(); + String[] rowArray = row.split("\u0000"); + next = rowArray[1]; + + hasNextCalled = true; + return true; + } + isEmpty = true; + return false; + } else if(isEmpty) { + return false; + }else { + return true; + } + } + + @Override + public String next() { + + if (hasNextCalled) { + hasNextCalled = false; + return next; + } else if(isEmpty) { + throw new NoSuchElementException(); + }else { + if (this.hasNext()) { + hasNextCalled = false; + return next; + } else { + throw new NoSuchElementException(); + } + } + } + + @Override + public void remove() { + + throw new UnsupportedOperationException("Cannot delete from iterator!"); + + } + + }; + } + + + + + + + + + public void setLocalityGroups() { + + HashMap<String, Set<Text>> localityGroups = new HashMap<String, Set<Text>>(); + Iterator<String> groups = getPredicates(); + + int i = 1; + + while(groups.hasNext()) { + HashSet<Text> tempColumn = new HashSet<Text>(); + String temp = groups.next(); + tempColumn.add(new Text(temp)); + String groupName = "predicate" + i; + localityGroups.put(groupName, tempColumn); + i++; + } + + + try { + conn.tableOperations().setLocalityGroups(tablePrefix + "doc_partitioned_index", localityGroups); + //conn.tableOperations().compact(tablePrefix + "doc_partitioned_index", null, null, true, true); + } catch (AccumuloException e) { + e.printStackTrace(); + } catch (AccumuloSecurityException e) { + e.printStackTrace(); + } catch (TableNotFoundException e) { + e.printStackTrace(); + } + + + + } + + + + + + + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityOptimizer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityOptimizer.java b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityOptimizer.java new file mode 100644 index 0000000..bb792ac --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityOptimizer.java @@ -0,0 +1,434 @@ +package mvm.rya.indexing.accumulo.entity; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Set; + +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.api.RdfCloudTripleStoreConfiguration; +import mvm.rya.api.persist.joinselect.SelectivityEvalDAO; +import mvm.rya.indexing.accumulo.ConfigUtils; +import mvm.rya.joinselect.AccumuloSelectivityEvalDAO; +import mvm.rya.prospector.service.ProspectorServiceEvalStatsDAO; +import mvm.rya.rdftriplestore.inference.DoNotExpandSP; +import mvm.rya.rdftriplestore.utils.FixedStatementPattern; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.openrdf.query.BindingSet; +import org.openrdf.query.Dataset; +import org.openrdf.query.algebra.Filter; +import org.openrdf.query.algebra.Join; +import org.openrdf.query.algebra.QueryModelNode; +import org.openrdf.query.algebra.StatementPattern; +import org.openrdf.query.algebra.TupleExpr; +import org.openrdf.query.algebra.evaluation.QueryOptimizer; +import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +public class EntityOptimizer implements QueryOptimizer, Configurable { + + private SelectivityEvalDAO<RdfCloudTripleStoreConfiguration> eval; + private RdfCloudTripleStoreConfiguration conf; + private boolean isEvalDaoSet = false; + + + public EntityOptimizer() { + + } + + public EntityOptimizer(RdfCloudTripleStoreConfiguration conf) { + if(conf.isUseStats() && conf.isUseSelectivity()) { + try { + eval = new AccumuloSelectivityEvalDAO(conf, ConfigUtils.getConnector(conf)); + ((AccumuloSelectivityEvalDAO)eval).setRdfEvalDAO(new ProspectorServiceEvalStatsDAO(ConfigUtils.getConnector(conf), conf)); + eval.init(); + } catch (AccumuloException e) { + e.printStackTrace(); + } catch (AccumuloSecurityException e) { + e.printStackTrace(); + } + + isEvalDaoSet = true; + } else { + eval = null; + isEvalDaoSet = true; + } + this.conf = conf; + } + + public EntityOptimizer(SelectivityEvalDAO<RdfCloudTripleStoreConfiguration> eval) { + this.eval = eval; + this.conf = eval.getConf(); + isEvalDaoSet = true; + } + + @Override + public void setConf(Configuration conf) { + if(conf instanceof RdfCloudTripleStoreConfiguration) { + this.conf = (RdfCloudTripleStoreConfiguration) conf; + } else { + this.conf = new AccumuloRdfConfiguration(conf); + } + + if (!isEvalDaoSet) { + if(this.conf.isUseStats() && this.conf.isUseSelectivity()) { + try { + eval = new AccumuloSelectivityEvalDAO(this.conf, ConfigUtils.getConnector(this.conf)); + ((AccumuloSelectivityEvalDAO)eval).setRdfEvalDAO(new ProspectorServiceEvalStatsDAO(ConfigUtils.getConnector(this.conf), this.conf)); + eval.init(); + } catch (AccumuloException e) { + e.printStackTrace(); + } catch (AccumuloSecurityException e) { + e.printStackTrace(); + } + + isEvalDaoSet = true; + } else { + eval = null; + isEvalDaoSet = true; + } + } + + } + + @Override + public Configuration getConf() { + return conf; + } + + /** + * Applies generally applicable optimizations: path expressions are sorted + * from more to less specific. + * + * @param tupleExpr + */ + @Override + public void optimize(TupleExpr tupleExpr, Dataset dataset, BindingSet bindings) { + tupleExpr.visit(new JoinVisitor()); + } + + protected class JoinVisitor extends QueryModelVisitorBase<RuntimeException> { + + @Override + public void meet(Join node) { + try { + if (node.getLeftArg() instanceof FixedStatementPattern && node.getRightArg() instanceof DoNotExpandSP) { + return; + } + List<TupleExpr> joinArgs = getJoinArgs(node, new ArrayList<TupleExpr>()); + HashMultimap<String, StatementPattern> varMap = getVarBins(joinArgs); + while (!varMap.keySet().isEmpty()) { + String s = getHighestPriorityKey(varMap); + constructTuple(varMap, joinArgs, s); + } + List<TupleExpr> filterChain = getFilterChain(joinArgs); + + for (TupleExpr te : joinArgs) { + if (!(te instanceof StatementPattern) || !(te instanceof EntityTupleSet)) { + te.visit(this); + } + } + // Replace old join hierarchy + node.replaceWith(getNewJoin(joinArgs, filterChain)); + + } catch (Exception e) { + e.printStackTrace(); + } + } + + private List<TupleExpr> getFilterChain(List<TupleExpr> joinArgs) { + List<TupleExpr> filterTopBottom = Lists.newArrayList(); + TupleExpr filterChainTop = null; + TupleExpr filterChainBottom = null; + + for(int i = 0; i < joinArgs.size(); i++) { + if(joinArgs.get(i) instanceof Filter) { + if(filterChainTop == null) { + filterChainTop = joinArgs.remove(i); + i--; + } else if(filterChainBottom == null){ + filterChainBottom = joinArgs.remove(i); + ((Filter)filterChainTop).setArg(filterChainBottom); + i--; + } else { + ((Filter)filterChainBottom).setArg(joinArgs.remove(i)); + filterChainBottom = ((Filter)filterChainBottom).getArg(); + i--; + } + } + } + if(filterChainTop != null) { + filterTopBottom.add(filterChainTop); + } + if(filterChainBottom != null) { + filterTopBottom.add(filterChainBottom); + } + return filterTopBottom; + } + + private TupleExpr getNewJoin(List<TupleExpr> joinArgs, List<TupleExpr> filterChain) { + TupleExpr newJoin; + + if (joinArgs.size() > 1) { + if (filterChain.size() > 0) { + TupleExpr finalJoinArg = joinArgs.remove(0); + TupleExpr tempJoin; + TupleExpr temp = filterChain.get(0); + + if (joinArgs.size() > 1) { + tempJoin = new Join(joinArgs.remove(0), joinArgs.remove(0)); + for (TupleExpr te : joinArgs) { + tempJoin = new Join(tempJoin, te); + } + } else { + tempJoin = joinArgs.remove(0); + } + + if (filterChain.size() == 1) { + ((Filter) temp).setArg(tempJoin); + } else { + ((Filter) filterChain.get(1)).setArg(tempJoin); + } + newJoin = new Join(temp, finalJoinArg); + } else { + newJoin = new Join(joinArgs.get(0), joinArgs.get(1)); + joinArgs.remove(0); + joinArgs.remove(0); + + for (TupleExpr te : joinArgs) { + newJoin = new Join(newJoin, te); + } + } + } else if (joinArgs.size() == 1) { + if (filterChain.size() > 0) { + newJoin = filterChain.get(0); + if (filterChain.size() == 1) { + ((Filter) newJoin).setArg(joinArgs.get(0)); + } else { + ((Filter) filterChain.get(1)).setArg(joinArgs.get(0)); + } + } else { + newJoin = joinArgs.get(0); + } + } else { + throw new IllegalStateException("JoinArgs size cannot be zero."); + } + return newJoin; + } + + private HashMultimap<String, StatementPattern> getVarBins(List<TupleExpr> nodes) { + + HashMultimap<String, StatementPattern> varMap = HashMultimap.create(); + + for (QueryModelNode node : nodes) { + if (node instanceof StatementPattern) { + StatementPattern sp = (StatementPattern) node; + if (sp.getPredicateVar().isConstant()) { + varMap.put(sp.getSubjectVar().getName(), sp); + varMap.put(sp.getObjectVar().getName(), sp); + } + } + } + + removeInvalidBins(varMap, true); + + return varMap; + } + + private void updateVarMap(HashMultimap<String, StatementPattern> varMap, Set<StatementPattern> bin) { + + for (StatementPattern sp : bin) { + varMap.remove(sp.getSubjectVar().getName(), sp); + varMap.remove(sp.getObjectVar().getName(), sp); + } + + removeInvalidBins(varMap, false); + + } + + private void removeInvalidBins(HashMultimap<String, StatementPattern> varMap, boolean newMap) { + + Set<String> keys = Sets.newHashSet(varMap.keySet()); + + if (newMap) { + for (String s : keys) { + Set<StatementPattern> spSet = Sets.newHashSet(varMap.get(s)); + if (!StarQuery.isValidStarQuery(spSet)) { + for (StatementPattern sp : spSet) { + varMap.remove(s, sp); + } + } + + } + } else { + + for (String s : keys) { + Set<StatementPattern> spSet = Sets.newHashSet(varMap.get(s)); + if (spSet.size() == 1) { + for (StatementPattern sp : spSet) { + varMap.remove(s, sp); + } + } + + } + } + + } + + private void constructTuple(HashMultimap<String, StatementPattern> varMap, List<TupleExpr> joinArgs, + String binName) { + + Set<StatementPattern> bin = Sets.newHashSet(varMap.get(binName)); + StarQuery sq = new StarQuery(bin); + + updateVarMap(varMap, bin); + for (StatementPattern sp : bin) { + joinArgs.remove(sp); + } + + joinArgs.add(new EntityTupleSet(sq, conf)); + + } + + private String getHighestPriorityKey(HashMultimap<String, StatementPattern> varMap) { + + double tempPriority = -1; + double priority = -Double.MAX_VALUE; + String priorityKey = ""; + Set<StatementPattern> bin = null; + + Set<String> keys = varMap.keySet(); + + for (String s : keys) { + bin = varMap.get(s); + tempPriority = bin.size(); + tempPriority *= getCardinality(bin); + tempPriority *= getMinCardSp(bin); + + // weight starQuery where common Var is constant slightly more -- this factor is subject + // to change + if(s.startsWith("-const-")) { + tempPriority *= 10; + } + if (tempPriority > priority) { + priority = tempPriority; + priorityKey = s; + } + } + return priorityKey; + } + + private double getMinCardSp(Collection<StatementPattern> nodes) { + + double cardinality = Double.MAX_VALUE; + double tempCard = -1; + + if (eval == null) { + return 1; + } + + for (StatementPattern sp : nodes) { + + try { + tempCard = eval.getCardinality(conf, sp); + + if (tempCard < cardinality) { + cardinality = tempCard; + + } + } catch (Exception e) { + e.printStackTrace(); + } + + } + + return cardinality; + + } + + private double getCardinality(Collection<StatementPattern> spNodes) { + + double cardinality = Double.MAX_VALUE; + double tempCard = -1; + + + if(eval == null) { + return 1; + } + + List<StatementPattern> nodes = Lists.newArrayList(spNodes); + + AccumuloSelectivityEvalDAO ase = (AccumuloSelectivityEvalDAO) eval; + ase.setDenormalized(true); + + try { + + for (int i = 0; i < nodes.size(); i++) { + for (int j = i + 1; j < nodes.size(); j++) { + tempCard = ase.getJoinSelect(conf, nodes.get(i), nodes.get(j)); + if (tempCard < cardinality) { + cardinality = tempCard; + } + } + } + + } catch (Exception e) { + e.printStackTrace(); + } + + ase.setDenormalized(false); + + return cardinality / (nodes.size() + 1); + + } + + protected <L extends List<TupleExpr>> L getJoinArgs(TupleExpr tupleExpr, L joinArgs) { + if (tupleExpr instanceof Join) { + if (!(((Join) tupleExpr).getLeftArg() instanceof FixedStatementPattern) + && !(((Join) tupleExpr).getRightArg() instanceof DoNotExpandSP)) { + Join join = (Join) tupleExpr; + getJoinArgs(join.getLeftArg(), joinArgs); + getJoinArgs(join.getRightArg(), joinArgs); + } + } else if(tupleExpr instanceof Filter) { + joinArgs.add(tupleExpr); + getJoinArgs(((Filter)tupleExpr).getArg(), joinArgs); + } else { + joinArgs.add(tupleExpr); + } + + return joinArgs; + } + + } + + + +}