http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/indexing/src/main/java/mvm/rya/indexing/StatementContraints.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/StatementContraints.java b/extras/indexing/src/main/java/mvm/rya/indexing/StatementContraints.java new file mode 100644 index 0000000..3f3a390 --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/indexing/StatementContraints.java @@ -0,0 +1,72 @@ +package mvm.rya.indexing; + +/* + * #%L + * mvm.rya.indexing.accumulo + * %% + * Copyright (C) 2014 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.util.Set; + +import org.openrdf.model.Resource; +import org.openrdf.model.URI; + +public class StatementContraints { + private Resource context = null; + private Resource subject = null; + private Set<URI> predicates = null; + + public StatementContraints setContext(Resource context) { + this.context = context; + return this; + } + + public StatementContraints setPredicates(Set<URI> predicates) { + this.predicates = predicates; + return this; + } + + public StatementContraints setSubject(Resource subject) { + this.subject = subject; + return this; + } + + public Resource getContext() { + return context; + } + + public Set<URI> getPredicates() { + return predicates; + } + + public Resource getSubject() { + return subject; + } + + public boolean hasSubject() { + return subject != null; + } + + public boolean hasPredicates() { + return predicates != null && !predicates.isEmpty(); + } + + public boolean hasContext() { + return context != null; + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/indexing/src/main/java/mvm/rya/indexing/TemporalIndexer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/TemporalIndexer.java b/extras/indexing/src/main/java/mvm/rya/indexing/TemporalIndexer.java new file mode 100644 index 0000000..f2c6892 --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/indexing/TemporalIndexer.java @@ -0,0 +1,166 @@ +package mvm.rya.indexing; + +import info.aduna.iteration.CloseableIteration; + +import java.io.Closeable; +import java.io.Flushable; +import java.io.IOException; +import java.util.Collection; +import java.util.Set; + +import mvm.rya.api.persist.index.RyaSecondaryIndexer; + +import org.openrdf.model.Statement; +import org.openrdf.model.URI; +import org.openrdf.query.QueryEvaluationException; + +/** + * A repository to store, index, and retrieve {@link Statement}s based on time. + * Instants: + * Instant {before, equals, after} Instant + * Instant {before, after, inside} Interval + * Instant {hasBeginning, hasEnd} Interval + * + * OWL-Time provides the interval relations: + * <pre> + * intervalEquals, + * intervalBefore, + * intervalMeets, + * intervalOverlaps, + * intervalStarts, + * intervalDuring, + * intervalFinishes, + * + * and their reverse interval relations: + * intervalAfter, + * intervalMetBy, + * intervalOverlappedBy, + * intervalStartedBy, + * intervalContains, + * intervalFinishedBy. + * + * from Allen paper in 1983 + * + * Relation Y Symbol Inverse Y + * before Y < > X + * equal Y = = X + * meets Y m mi X + * overlaps Y o oi X + * during Y d di X + * starts Y s si X + * finishes Y f fi X + * </pre> + * + */ + +public interface TemporalIndexer extends RyaSecondaryIndexer { + + /* consider ParseException here */ + + /*- + * + * And Now, what you you've all been waiting for, the queries: + * the instant versions: + * format: x {relation} y + * read: Given literal y, find all statements where the date object x is ( x relation y ) + * Instant {before, equals, after} Instant + * Instant {before, after, inside} Interval + * Instant {hasBeginning, hasEnd} Interval + * + * the Allen interval relations, as described above. + * intervalEquals, + * intervalBefore, + * intervalMeets, + * intervalOverlaps, + * intervalStarts, + * intervalDuring, + * intervalFinishes + * and then the inverses, including after. + */ + + public abstract CloseableIteration<Statement, QueryEvaluationException> queryInstantEqualsInstant( + TemporalInstant queryInstant, StatementContraints contraints) + throws QueryEvaluationException;; + + public abstract CloseableIteration<Statement, QueryEvaluationException> queryInstantBeforeInstant( + TemporalInstant queryInstant, StatementContraints contraints) + throws QueryEvaluationException;; + + public abstract CloseableIteration<Statement, QueryEvaluationException> queryInstantAfterInstant( + TemporalInstant queryInstant, StatementContraints contraints) + throws QueryEvaluationException;; + + public abstract CloseableIteration<Statement, QueryEvaluationException> queryInstantBeforeInterval( + TemporalInterval givenInterval, StatementContraints contraints) + throws QueryEvaluationException;; + + public abstract CloseableIteration<Statement, QueryEvaluationException> queryInstantAfterInterval( + TemporalInterval givenInterval, StatementContraints contraints) + throws QueryEvaluationException; + + public abstract CloseableIteration<Statement, QueryEvaluationException> queryInstantInsideInterval( + TemporalInterval givenInterval, StatementContraints contraints) + throws QueryEvaluationException; + + public abstract CloseableIteration<Statement, QueryEvaluationException> queryInstantHasBeginningInterval( + TemporalInterval queryInterval, StatementContraints contraints) + throws QueryEvaluationException; + + public abstract CloseableIteration<Statement, QueryEvaluationException> queryInstantHasEndInterval( + TemporalInterval queryInterval, StatementContraints contraints) + throws QueryEvaluationException; + + /** + * Returns statements that contain a time instance that is equal to the + * queried time and meet the {@link StatementContraints}. + * + * @param query + * the queried time instance + * @param contraints + * the {@link StatementContraints} + * @return + * @throws QueryEvaluationException + */ + public abstract CloseableIteration<Statement, QueryEvaluationException> queryIntervalEquals( + TemporalInterval query, StatementContraints contraints) + throws QueryEvaluationException; + + /** + * Returns statements that contain a time instances that are before the + * queried {@link TemporalInterval} and meet the {@link StatementContraints} + * + * @param query + * the queried time instance + * @param contraints + * the {@link StatementContraints} + * @return + */ + public abstract CloseableIteration<Statement, QueryEvaluationException> queryIntervalBefore( + TemporalInterval query, StatementContraints contraints) + throws QueryEvaluationException; + + /** + * Returns statements that contain a time instance that is after the queried {@link TemporalInterval} and meet the {@link StatementContraints}. + * + * @param query + * the queried time instance + * @param contraints + * the {@link StatementContraints} + * @return + */ + public abstract CloseableIteration<Statement, QueryEvaluationException> queryIntervalAfter( + TemporalInterval query, StatementContraints contraints) + throws QueryEvaluationException; + + /* End of the Allen algebra queries */ + /** + * @return the set of predicates indexed by the indexer. + */ + public abstract Set<URI> getIndexablePredicates(); + + @Override + public abstract void flush() throws IOException; + + @Override + public abstract void close() throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/indexing/src/main/java/mvm/rya/indexing/TemporalInstant.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/TemporalInstant.java b/extras/indexing/src/main/java/mvm/rya/indexing/TemporalInstant.java new file mode 100644 index 0000000..e988b70 --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/indexing/TemporalInstant.java @@ -0,0 +1,63 @@ +package mvm.rya.indexing; + +import java.io.Serializable; + +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; + +/** + * Time and date interface for building intervals. + * + *Implementations: + * Implementation should have a factory method for TemporalInterval since TemporalIntervals reference only this + * interface for begin & end, so it injects an implementation. + * public static TemporalInterval parseInterval(String dateTimeInterval) + * + * The following are notes and may not have been implemented. + * + * = rfc3339 + *https://www.ietf.org/rfc/rfc3339.txt + * a subset of ISO-8601 + * YYYY-MM-DDThh:mm:ss.fffZ + * Limits: + *All dates and times are assumed to be in the "current era", + somewhere between 0000AD and 9999AD. + * resolution: to the second, or millisecond if the optional fraction is used. + * + * = epoch + * 32bit or 64bit integer specifying the number of seconds since a standard date-time (1970) + * 32bit is good until 2038. + * 64bit is good until after the heat death of our universe + * + */ +public interface TemporalInstant extends Comparable<TemporalInstant>, Serializable { + @Override + public boolean equals(Object obj) ; + + @Override + public int compareTo(TemporalInstant o) ; + + @Override + public int hashCode() ; + /** + * Get the date as a byte array. + */ + public byte[] getAsKeyBytes(); + /** + * Get the date as a String. + */ + public String getAsKeyString(); + /** + * Get the date as a human readable for reporting with timeZone. + */ + public String getAsReadable(DateTimeZone tz); + /** + * Get the date as a human readable for reporting, timeZone is implementation specific. + */ + public String getAsReadable(); + /** + * Get the date as a Joda/Java v8 DateTime. + */ + public DateTime getAsDateTime(); + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/indexing/src/main/java/mvm/rya/indexing/TemporalInterval.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/TemporalInterval.java b/extras/indexing/src/main/java/mvm/rya/indexing/TemporalInterval.java new file mode 100644 index 0000000..c8af18d --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/indexing/TemporalInterval.java @@ -0,0 +1,161 @@ +package mvm.rya.indexing; + +import java.io.UnsupportedEncodingException; + +/** + * A time with beginning and end date and time, which could be indefinitely in + * the past or future. Immutable, so it's thread safe. For use in reading and + * writing from Rya's temporal indexing scheme. + * + */ +public class TemporalInterval implements Comparable<TemporalInterval> { + + // the beginning and end. Read-only because they are final references to immutable objects. + private final TemporalInstant hasBeginning; + private final TemporalInstant hasEnd; + + /** + * Separate the beginning and end with this. + * Used because Joda time library's interval uses this. + * TODO: Move this down to the TemporalInterval implementation. + * TODO: Then add a TemporalInterval.keyConcatenate(). + */ + public static final String DELIMITER = "/"; + +// /** +// * Empty constructor -- not allowed, no defaults. +// * For an infinite span of time: do it like this: +// * new TemporalInterval(TemporalInstantImpl.getMinimum, TemporalInstantImpl.getMaximum) +// */ +// public TemporalInterval() { +// hasBeginning = null; +// hasEnd = null; +// } + + /** + * Constructor setting beginning and end with an implementation of {@link TemporalInstant}. + * beginning must be less than end. + * + * @param hasBeginning + * @param hasEnd + */ + public TemporalInterval(TemporalInstant hasBeginning, TemporalInstant hasEnd) { + super(); + if (hasBeginning != null && hasEnd != null && 0 < hasBeginning.compareTo(hasEnd)) + throw new IllegalArgumentException("The Beginning instance must not compare greater than the end."); + this.hasBeginning = hasBeginning; + this.hasEnd = hasEnd; + } + + /** + * @return the hasBeginning + */ + public TemporalInstant getHasBeginning() { + return hasBeginning; + } + + /** + * @return the hasEnd + */ + public TemporalInstant getHasEnd() { + return hasEnd; + } + + /** + * True if CompareTo() says equal (0) + */ + @Override + public boolean equals(Object other) { + return other instanceof TemporalInterval + && this.compareTo((TemporalInterval) other) == 0; + }; + + /** + * Compare beginnings, if the same then compare ends, or equal if beginnings equal and endings equal. + * Nulls represent infinity. + */ + @Override + public int compareTo(TemporalInterval other) { + int compBegins = this.hasBeginning.compareTo(other.hasBeginning); + if (0 == compBegins) + return this.hasEnd.compareTo(other.hasEnd); + else + return compBegins; + + } + + /** + * Hashcode for + */ + @Override + public int hashCode() { + if (hasBeginning == null) + if (hasEnd == null) + return 0; + else + return hasEnd.hashCode(); + else + return hashboth(this.hasBeginning.hashCode(), + this.hasEnd.hashCode()); + } + + /** + * Hashcode combining two string hashcodes. + */ + protected static int hashboth(int i1, int i2) { + // return (int) (( 1L * i1 * i2) ; % (1L + Integer.MAX_VALUE)); + // let the overflow happen. It won't throw an error. + return (i1 + i2); + } + + /** + * Get the key use for rowid for the beginning of the interval. Use ascii + * for conversion to catch and prevent multi-byte chars. + * + * @return + */ + public byte[] getAsKeyBeginning() { + try { + return (hasBeginning.getAsKeyString() + DELIMITER + hasEnd + .getAsKeyString()).getBytes("US-ASCII"); + } catch (UnsupportedEncodingException e) { + // this is a code error, the strings are mostly numbers. + throw new Error("while converting key string to ascii bytes", e); + } + } + + /** + * get the key used for indexing the end of the interval. Use ascii for + * conversion to catch and prevent multi-byte chars. + * + * @return + */ + public byte[] getAsKeyEnd() { + try { + return (hasEnd.getAsKeyString() + DELIMITER + hasBeginning + .getAsKeyString()).getBytes("US-ASCII"); + } catch (UnsupportedEncodingException e) { + // this is a code error, the strings are mostly numbers and ascii + // symbols. + throw new Error("while converting key string to ascii bytes", e); + } + } + + /** + * Format as a "period" in this paper. This is not a standard, really. + * http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.298.8948&rep=rep1&type=pdf + * also consider using the typed literal syntax: + * "[2010-01-01,2010-01-31]"^^xs:period + * @return [begindate,enddate] for example: [2010-01-01,2010-01-31] + * + */ + public String getAsPair() { + return "["+hasBeginning.getAsReadable() + "," + hasEnd.getAsReadable() + "]"; + } + + @Override + public String toString() { + return getAsPair() ; + // return hasBeginning.getAsReadable() + DELIMITER + hasEnd.getAsReadable(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/ConfigUtils.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/ConfigUtils.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/ConfigUtils.java new file mode 100644 index 0000000..aefd3f5 --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/ConfigUtils.java @@ -0,0 +1,425 @@ +package mvm.rya.indexing.accumulo; + +/* + * #%L + * mvm.rya.indexing.accumulo + * %% + * Copyright (C) 2014 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.accumulo.experimental.AccumuloIndexer; +import mvm.rya.api.RdfCloudTripleStoreConfiguration; +import mvm.rya.indexing.FilterFunctionOptimizer; +import mvm.rya.indexing.accumulo.entity.EntityCentricIndex; +import mvm.rya.indexing.accumulo.entity.EntityOptimizer; +import mvm.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer; +import mvm.rya.indexing.accumulo.freetext.LuceneTokenizer; +import mvm.rya.indexing.accumulo.freetext.Tokenizer; +import mvm.rya.indexing.accumulo.geo.GeoMesaGeoIndexer; +import mvm.rya.indexing.accumulo.temporal.AccumuloTemporalIndexer; +import mvm.rya.indexing.external.PrecompJoinOptimizer; +import mvm.rya.indexing.mongodb.MongoGeoIndexer; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchScanner; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.MultiTableBatchWriter; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.commons.lang.Validate; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.log4j.Logger; +import org.openrdf.model.URI; +import org.openrdf.model.impl.URIImpl; +import org.openrdf.query.algebra.evaluation.QueryOptimizer; + +import com.google.common.collect.Lists; + +/** + * A set of configuration utils to read a Hadoop {@link Configuration} object and create Cloudbase/Accumulo objects. + */ +public class ConfigUtils { + private static final Logger logger = Logger.getLogger(ConfigUtils.class); + + public static final String CLOUDBASE_TBL_PREFIX = "sc.cloudbase.tableprefix"; + public static final String CLOUDBASE_AUTHS = "sc.cloudbase.authorizations"; + public static final String CLOUDBASE_INSTANCE = "sc.cloudbase.instancename"; + public static final String CLOUDBASE_ZOOKEEPERS = "sc.cloudbase.zookeepers"; + public static final String CLOUDBASE_USER = "sc.cloudbase.username"; + public static final String CLOUDBASE_PASSWORD = "sc.cloudbase.password"; + + public static final String CLOUDBASE_WRITER_MAX_WRITE_THREADS = "sc.cloudbase.writer.maxwritethreads"; + public static final String CLOUDBASE_WRITER_MAX_LATENCY = "sc.cloudbase.writer.maxlatency"; + public static final String CLOUDBASE_WRITER_MAX_MEMORY = "sc.cloudbase.writer.maxmemory"; + + public static final String FREE_TEXT_QUERY_TERM_LIMIT = "sc.freetext.querytermlimit"; + + public static final String FREE_TEXT_DOC_TABLENAME = "sc.freetext.doctable"; + public static final String FREE_TEXT_TERM_TABLENAME = "sc.freetext.termtable"; + public static final String GEO_TABLENAME = "sc.geo.table"; + public static final String GEO_NUM_PARTITIONS = "sc.geo.numPartitions"; + public static final String TEMPORAL_TABLENAME = "sc.temporal.index"; + public static final String ENTITY_TABLENAME = "sc.entity.index"; + + public static final String USE_GEO = "sc.use_geo"; + public static final String USE_FREETEXT = "sc.use_freetext"; + public static final String USE_TEMPORAL = "sc.use_temporal"; + public static final String USE_ENTITY = "sc.use_entity"; + public static final String USE_PCJ = "sc.use_pcj"; + public static final String USE_OPTIMAL_PCJ = "sc.use.optimal.pcj"; + + public static final String USE_INDEXING_SAIL = "sc.use.indexing.sail"; + public static final String USE_EXTERNAL_SAIL = "sc.use.external.sail"; + + public static final String USE_MOCK_INSTANCE = ".useMockInstance"; + + public static final String NUM_PARTITIONS = "sc.cloudbase.numPartitions"; + + private static final int WRITER_MAX_WRITE_THREADS = 1; + private static final long WRITER_MAX_LATNECY = Long.MAX_VALUE; + private static final long WRITER_MAX_MEMORY = 10000L; + + public static final String DISPLAY_QUERY_PLAN = "query.printqueryplan"; + + public static final String FREETEXT_PREDICATES_LIST = "sc.freetext.predicates"; + public static final String FREETEXT_DOC_NUM_PARTITIONS = "sc.freetext.numPartitions.text"; + public static final String FREETEXT_TERM_NUM_PARTITIONS = "sc.freetext.numPartitions.term"; + + public static final String TOKENIZER_CLASS = "sc.freetext.tokenizer.class"; + + public static final String GEO_PREDICATES_LIST = "sc.geo.predicates"; + + public static final String TEMPORAL_PREDICATES_LIST = "sc.temporal.predicates"; + + public static final String USE_MONGO = "sc.useMongo"; + + public static boolean isDisplayQueryPlan(Configuration conf){ + return conf.getBoolean(DISPLAY_QUERY_PLAN, false); + } + + /** + * get a value from the configuration file and throw an exception if the value does not exist. + * + * @param conf + * @param key + * @return + */ + private static String getStringCheckSet(Configuration conf, String key) { + String value = conf.get(key); + Validate.notNull(value, key + " not set"); + return value; + } + + /** + * @param conf + * @param tablename + * @return if the table was created + * @throws AccumuloException + * @throws AccumuloSecurityException + * @throws TableExistsException + */ + public static boolean createTableIfNotExists(Configuration conf, String tablename) throws AccumuloException, AccumuloSecurityException, + TableExistsException { + TableOperations tops = getConnector(conf).tableOperations(); + if (!tops.exists(tablename)) { + logger.info("Creating table: " + tablename); + tops.create(tablename); + return true; + } + return false; + } + + private static String getIndexTableName(Configuration conf, String indexTableNameConf, String altSuffix){ + String value = conf.get(indexTableNameConf); + if (value == null){ + String defaultTableName = conf.get(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX); + Validate.notNull(defaultTableName, indexTableNameConf + " not set and " + RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX + " not set. Cannot generate table name."); + value = conf.get(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX) + altSuffix; + } + return value; + } + + public static String getFreeTextDocTablename(Configuration conf) { + return getIndexTableName(conf, FREE_TEXT_DOC_TABLENAME, "freetext"); + } + + public static String getFreeTextTermTablename(Configuration conf) { + return getIndexTableName(conf, FREE_TEXT_TERM_TABLENAME, "freetext_term"); + } + + public static int getFreeTextTermLimit(Configuration conf) { + return conf.getInt(FREE_TEXT_QUERY_TERM_LIMIT, 100); + } + + public static String getGeoTablename(Configuration conf) { + return getIndexTableName(conf, GEO_TABLENAME, "geo"); + } + + public static String getTemporalTableName(Configuration conf) { + return getIndexTableName(conf, TEMPORAL_TABLENAME, "temporal"); + } + + + public static String getEntityTableName(Configuration conf) { + return getIndexTableName(conf, ENTITY_TABLENAME, "entity"); + } + + + public static Set<URI> getFreeTextPredicates(Configuration conf) { + return getPredicates(conf, FREETEXT_PREDICATES_LIST); + } + + public static Set<URI> getGeoPredicates(Configuration conf) { + return getPredicates(conf, GEO_PREDICATES_LIST); + } + /** + * Used for indexing statements about date & time instances and intervals. + * @param conf + * @return Set of predicate URI's whose objects should be date time literals. + */ + public static Set<URI> getTemporalPredicates(Configuration conf) { + return getPredicates(conf, TEMPORAL_PREDICATES_LIST); + } + + private static Set<URI> getPredicates(Configuration conf, String confName) { + String[] validPredicateStrings = conf.getStrings(confName, new String[] {}); + Set<URI> predicates = new HashSet<URI>(); + for (String prediateString : validPredicateStrings) { + predicates.add(new URIImpl(prediateString)); + } + return predicates; + } + + public static Tokenizer getFreeTextTokenizer(Configuration conf) { + Class<? extends Tokenizer> c = conf.getClass(TOKENIZER_CLASS, LuceneTokenizer.class, Tokenizer.class); + return ReflectionUtils.newInstance(c, conf); + } + + public static BatchWriter createDefaultBatchWriter(String tablename, Configuration conf) throws TableNotFoundException, + AccumuloException, AccumuloSecurityException { + Long DEFAULT_MAX_MEMORY = getWriterMaxMemory(conf); + Long DEFAULT_MAX_LATENCY = getWriterMaxLatency(conf); + Integer DEFAULT_MAX_WRITE_THREADS = getWriterMaxWriteThreads(conf); + Connector connector = ConfigUtils.getConnector(conf); + return connector.createBatchWriter(tablename, DEFAULT_MAX_MEMORY, DEFAULT_MAX_LATENCY, DEFAULT_MAX_WRITE_THREADS); + } + + public static MultiTableBatchWriter createMultitableBatchWriter(Configuration conf) throws AccumuloException, AccumuloSecurityException { + Long DEFAULT_MAX_MEMORY = getWriterMaxMemory(conf); + Long DEFAULT_MAX_LATENCY = getWriterMaxLatency(conf); + Integer DEFAULT_MAX_WRITE_THREADS = getWriterMaxWriteThreads(conf); + Connector connector = ConfigUtils.getConnector(conf); + return connector.createMultiTableBatchWriter(DEFAULT_MAX_MEMORY, DEFAULT_MAX_LATENCY, DEFAULT_MAX_WRITE_THREADS); + } + + public static Scanner createScanner(String tablename, Configuration conf) throws AccumuloException, AccumuloSecurityException, + TableNotFoundException { + Connector connector = ConfigUtils.getConnector(conf); + Authorizations auths = ConfigUtils.getAuthorizations(conf); + return connector.createScanner(tablename, auths); + + } + + public static BatchScanner createBatchScanner(String tablename, Configuration conf) throws AccumuloException, AccumuloSecurityException, + TableNotFoundException { + Connector connector = ConfigUtils.getConnector(conf); + Authorizations auths = ConfigUtils.getAuthorizations(conf); + Integer numThreads = null; + if (conf instanceof RdfCloudTripleStoreConfiguration) + numThreads = ((RdfCloudTripleStoreConfiguration) conf).getNumThreads(); + else + numThreads = conf.getInt(RdfCloudTripleStoreConfiguration.CONF_NUM_THREADS, 2); + return connector.createBatchScanner(tablename, auths, numThreads); + } + + public static int getWriterMaxWriteThreads(Configuration conf) { + return conf.getInt(CLOUDBASE_WRITER_MAX_WRITE_THREADS, WRITER_MAX_WRITE_THREADS); + } + + public static long getWriterMaxLatency(Configuration conf) { + return conf.getLong(CLOUDBASE_WRITER_MAX_LATENCY, WRITER_MAX_LATNECY); + } + + public static long getWriterMaxMemory(Configuration conf) { + return conf.getLong(CLOUDBASE_WRITER_MAX_MEMORY, WRITER_MAX_MEMORY); + } + + public static String getUsername(JobContext job) { + return getUsername(job.getConfiguration()); + } + + public static String getUsername(Configuration conf) { + return conf.get(CLOUDBASE_USER); + } + + public static Authorizations getAuthorizations(JobContext job) { + return getAuthorizations(job.getConfiguration()); + } + + public static Authorizations getAuthorizations(Configuration conf) { + String authString = conf.get(CLOUDBASE_AUTHS, ""); + if (authString.isEmpty()) { + return new Authorizations(); + } + return new Authorizations(authString.split(",")); + } + + public static Instance getInstance(JobContext job) { + return getInstance(job.getConfiguration()); + } + + public static Instance getInstance(Configuration conf) { + if (useMockInstance(conf)) { + return new MockInstance(conf.get(CLOUDBASE_INSTANCE)); + } + return new ZooKeeperInstance(conf.get(CLOUDBASE_INSTANCE), conf.get(CLOUDBASE_ZOOKEEPERS)); + } + + public static String getPassword(JobContext job) { + return getPassword(job.getConfiguration()); + } + + public static String getPassword(Configuration conf) { + return conf.get(CLOUDBASE_PASSWORD, ""); + } + + public static Connector getConnector(JobContext job) throws AccumuloException, AccumuloSecurityException { + return getConnector(job.getConfiguration()); + } + + public static Connector getConnector(Configuration conf) throws AccumuloException, AccumuloSecurityException { + Instance instance = ConfigUtils.getInstance(conf); + + return instance.getConnector(getUsername(conf), getPassword(conf)); + } + + public static boolean useMockInstance(Configuration conf) { + return conf.getBoolean(USE_MOCK_INSTANCE, false); + } + + private static int getNumPartitions(Configuration conf) { + return conf.getInt(NUM_PARTITIONS, 25); + } + + public static int getFreeTextDocNumPartitions(Configuration conf) { + return conf.getInt(FREETEXT_DOC_NUM_PARTITIONS, getNumPartitions(conf)); + } + + public static int getFreeTextTermNumPartitions(Configuration conf) { + return conf.getInt(FREETEXT_TERM_NUM_PARTITIONS, getNumPartitions(conf)); + } + + public static int getGeoNumPartitions(Configuration conf) { + return conf.getInt(GEO_NUM_PARTITIONS, getNumPartitions(conf)); + } + + public static boolean getUseGeo(Configuration conf) { + return conf.getBoolean(USE_GEO, false); + } + + public static boolean getUseFreeText(Configuration conf) { + return conf.getBoolean(USE_FREETEXT, false); + } + + public static boolean getUseTemporal(Configuration conf) { + return conf.getBoolean(USE_TEMPORAL, false); + } + + public static boolean getUseEntity(Configuration conf) { + return conf.getBoolean(USE_ENTITY, false); + } + + public static boolean getUsePCJ(Configuration conf) { + return conf.getBoolean(USE_PCJ, false); + } + + public static boolean getUseOptimalPCJ(Configuration conf) { + return conf.getBoolean(USE_OPTIMAL_PCJ, false); + } + + public static boolean getUseMongo(Configuration conf) { + return conf.getBoolean(USE_MONGO, false); + } + + + public static void setIndexers(RdfCloudTripleStoreConfiguration conf) { + + List<String> indexList = Lists.newArrayList(); + List<String> optimizers = Lists.newArrayList(); + + boolean useFilterIndex = false; + + if (ConfigUtils.getUseMongo(conf)) { + if (getUseGeo(conf)) { + indexList.add(MongoGeoIndexer.class.getName()); + useFilterIndex = true; + } + } else { + + if (getUsePCJ(conf) || getUseOptimalPCJ(conf)) { + conf.setPcjOptimizer(PrecompJoinOptimizer.class); + } + + if (getUseGeo(conf)) { + indexList.add(GeoMesaGeoIndexer.class.getName()); + useFilterIndex = true; + } + + if (getUseFreeText(conf)) { + indexList.add(AccumuloFreeTextIndexer.class.getName()); + useFilterIndex = true; + } + + if (getUseTemporal(conf)) { + indexList.add(AccumuloTemporalIndexer.class.getName()); + useFilterIndex = true; + } + + } + + if (useFilterIndex) { + optimizers.add(FilterFunctionOptimizer.class.getName()); + } + + if (getUseEntity(conf)) { + indexList.add(EntityCentricIndex.class.getName()); + optimizers.add(EntityOptimizer.class.getName()); + + } + + conf.setStrings(AccumuloRdfConfiguration.CONF_ADDITIONAL_INDEXERS, indexList.toArray(new String[]{})); + conf.setStrings(AccumuloRdfConfiguration.CONF_OPTIMIZERS, optimizers.toArray(new String[]{})); + + } + + + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/Md5Hash.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/Md5Hash.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/Md5Hash.java new file mode 100644 index 0000000..8d350bf --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/Md5Hash.java @@ -0,0 +1,44 @@ +package mvm.rya.indexing.accumulo; + +/* + * #%L + * mvm.rya.indexing.accumulo + * %% + * Copyright (C) 2014 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.accumulo.core.data.Value; +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.codec.binary.StringUtils; +import org.apache.commons.codec.digest.DigestUtils; + +/** + * Utility methods for generating hashes. Note that MD5 is 16 bytes, or 32 Hex chars. To make it smaller (but still printable), this class + * Base64 encodes those 16 bytes into 22 chars. + */ +public class Md5Hash { + public static String md5Base64(byte[] data) { + return Base64.encodeBase64URLSafeString(DigestUtils.md5(data)); + } + + public static String md5Base64(String string) { + return md5Base64(StringUtils.getBytesUtf8(string)); + } + + public static byte[] md5Binary(Value value) { + return DigestUtils.md5(value.get()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/StatementSerializer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/StatementSerializer.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/StatementSerializer.java new file mode 100644 index 0000000..773ee39 --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/StatementSerializer.java @@ -0,0 +1,226 @@ +package mvm.rya.indexing.accumulo; + +/* + * #%L + * mvm.rya.indexing.accumulo + * %% + * Copyright (C) 2014 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.IOException; +import java.util.Set; + +import mvm.rya.indexing.StatementContraints; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.Validate; +import org.openrdf.model.Literal; +import org.openrdf.model.Resource; +import org.openrdf.model.Statement; +import org.openrdf.model.URI; +import org.openrdf.model.Value; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ContextStatementImpl; +import org.openrdf.model.impl.StatementImpl; +import org.openrdf.model.impl.ValueFactoryImpl; + +/** + * A set of Utilities to serialize {@link Statement}s to/from {@link String}s. + */ +public class StatementSerializer { + private static String SEP = "\u0000"; + + private static ValueFactory VALUE_FACTORY = new ValueFactoryImpl(); + + /** + * Read a {@link Statement} from a {@link String} + * + * @param in + * the {@link String} to parse + * @return a {@link Statement} + */ + public static Statement readStatement(String in) throws IOException { + String[] parts = in.split(SEP); + + if (parts.length != 4) { + throw new IOException("Not a valid statement: " + in); + } + + String contextString = parts[0]; + String subjectString = parts[1]; + String predicateString = parts[2]; + String objectString = parts[3]; + return readStatement(subjectString, predicateString, objectString, contextString); + } + + public static Statement readStatement(String subjectString, String predicateString, String objectString) { + return readStatement(subjectString, predicateString, objectString, ""); + } + + public static Statement readStatement(String subjectString, String predicateString, String objectString, String contextString) { + Resource subject = createResource(subjectString); + URI predicate = VALUE_FACTORY.createURI(predicateString); + + boolean isObjectLiteral = objectString.startsWith("\""); + + Value object = null; + if (isObjectLiteral) { + object = parseLiteral(objectString); + } else { + object = createResource(objectString); + } + + if (contextString == null || contextString.isEmpty()) { + return new StatementImpl(subject, predicate, object); + } else { + Resource context = VALUE_FACTORY.createURI(contextString); + return new ContextStatementImpl(subject, predicate, object, context); + } + } + + private static Resource createResource(String str) { + if (str.startsWith("_")) { + return VALUE_FACTORY.createBNode(str.substring(2)); + } + return VALUE_FACTORY.createURI(str); + + } + + private static Literal parseLiteral(String fullLiteralString) { + Validate.notNull(fullLiteralString); + Validate.isTrue(fullLiteralString.length() > 1); + + if (fullLiteralString.endsWith("\"")) { + String fullLiteralWithoutQuotes = fullLiteralString.substring(1, fullLiteralString.length() - 1); + return VALUE_FACTORY.createLiteral(fullLiteralWithoutQuotes, (String) null); + } else { + + // find the closing quote + int labelEnd = fullLiteralString.lastIndexOf("\""); + + String label = fullLiteralString.substring(1, labelEnd); + + String data = fullLiteralString.substring(labelEnd + 1); + + if (data.startsWith("@")) { + // the data is "language" + String lang = data.substring(1); + return VALUE_FACTORY.createLiteral(label, lang); + } else if (data.startsWith("^^<")) { + // the data is a "datatype" + String datatype = data.substring(3, data.length() - 1); + URI datatypeUri = VALUE_FACTORY.createURI(datatype); + return VALUE_FACTORY.createLiteral(label, datatypeUri); + } + } + return null; + + } + + public static String writeSubject(Statement statement) { + return statement.getSubject().toString(); + } + + public static String writeObject(Statement statement) { + return statement.getObject().toString(); + } + + public static String writePredicate(Statement statement) { + return statement.getPredicate().toString(); + } + + public static String writeSubjectPredicate(Statement statement) { + Validate.notNull(statement); + Validate.notNull(statement.getSubject()); + Validate.notNull(statement.getPredicate()); + return statement.getSubject().toString() + SEP + statement.getPredicate().toString(); + } + + public static String writeContext(Statement statement) { + if (statement.getContext() == null) { + return ""; + } + return statement.getContext().toString(); + } + + /** + * Write a {@link Statement} to a {@link String} + * + * @param statement + * the {@link Statement} to write + * @return a {@link String} representation of the statement + */ + public static String writeStatement(Statement statement) { + Resource subject = statement.getSubject(); + Resource context = statement.getContext(); + URI predicate = statement.getPredicate(); + Value object = statement.getObject(); + + Validate.notNull(subject); + Validate.notNull(predicate); + Validate.notNull(object); + + String s = ""; + if (context == null) { + s = SEP + subject.toString() + SEP + predicate.toString() + SEP + object.toString(); + } else { + s = context.toString() + SEP + subject.toString() + SEP + predicate.toString() + SEP + object.toString(); + } + return s; + } + + /** + * Creates a Regular Expression to match serialized statements meeting these constraints. A <code>null</code> or empty parameters imply + * no constraint. A <code>null</code> return value implies no constraints. + * + * @param context + * context constraint + * @param subject + * subject constraint + * @param predicates + * list of predicate constraints + * @return a regular expression that can be used to match serialized statements. A <code>null</code> return value implies no + * constraints. + */ + public static String createStatementRegex(StatementContraints contraints) { + Resource context = contraints.getContext(); + Resource subject = contraints.getSubject(); + Set<URI> predicates = contraints.getPredicates(); + if (context == null && subject == null && (predicates == null || predicates.isEmpty())) { + return null; + } + + // match on anything but a separator + String anyReg = "[^" + SEP + "]*"; + + // if context is empty, match on any context + String contextReg = (context == null) ? anyReg : context.stringValue(); + + // if subject is empty, match on any subject + String subjectReg = (subject == null) ? anyReg : subject.stringValue(); + + // if the predicates are empty, match on any predicate. Otherwise, "or" the predicates. + String predicateReg = ""; + if (predicates == null || predicates.isEmpty()) { + predicateReg = anyReg; + } else { + predicateReg = "(" + StringUtils.join(predicates, "|") + ")"; + } + + return "^" + contextReg + SEP + subjectReg + SEP + predicateReg + SEP + ".*"; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/AccumuloDocIdIndexer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/AccumuloDocIdIndexer.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/AccumuloDocIdIndexer.java new file mode 100644 index 0000000..f3eb282 --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/AccumuloDocIdIndexer.java @@ -0,0 +1,430 @@ +package mvm.rya.indexing.accumulo.entity; + +import static mvm.rya.api.RdfCloudTripleStoreConstants.DELIM_BYTE; +import static mvm.rya.api.RdfCloudTripleStoreConstants.TYPE_DELIM_BYTE; +import info.aduna.iteration.CloseableIteration; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; +import java.util.NoSuchElementException; +import java.util.Set; + +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.accumulo.documentIndex.DocIndexIteratorUtil; +import mvm.rya.accumulo.documentIndex.DocumentIndexIntersectingIterator; +import mvm.rya.api.RdfCloudTripleStoreConfiguration; +import mvm.rya.api.domain.RyaURI; +import mvm.rya.api.resolver.RyaContext; +import mvm.rya.api.resolver.RyaToRdfConversions; +import mvm.rya.api.resolver.RyaTypeResolverException; +import mvm.rya.indexing.DocIdIndexer; +import mvm.rya.indexing.accumulo.ConfigUtils; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchScanner; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.openrdf.query.BindingSet; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.algebra.StatementPattern; +import org.openrdf.query.algebra.TupleExpr; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; +import org.openrdf.query.algebra.helpers.StatementPatternCollector; +import org.openrdf.query.parser.ParsedQuery; +import org.openrdf.query.parser.sparql.SPARQLParser; + +import com.google.common.base.Preconditions; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Sets; +import com.google.common.primitives.Bytes; + +public class AccumuloDocIdIndexer implements DocIdIndexer { + + + + private BatchScanner bs; + private AccumuloRdfConfiguration conf; + + public AccumuloDocIdIndexer(RdfCloudTripleStoreConfiguration conf) throws AccumuloException, AccumuloSecurityException { + Preconditions.checkArgument(conf instanceof RdfCloudTripleStoreConfiguration, "conf must be isntance of RdfCloudTripleStoreConfiguration"); + this.conf = (AccumuloRdfConfiguration) conf; + //Connector conn = ConfigUtils.getConnector(conf); + } + + + + + public CloseableIteration<BindingSet, QueryEvaluationException> queryDocIndex(String sparqlQuery, + Collection<BindingSet> constraints) throws TableNotFoundException, QueryEvaluationException { + + SPARQLParser parser = new SPARQLParser(); + ParsedQuery pq1 = null; + try { + pq1 = parser.parseQuery(sparqlQuery, null); + } catch (MalformedQueryException e) { + e.printStackTrace(); + } + + TupleExpr te1 = pq1.getTupleExpr(); + List<StatementPattern> spList1 = StatementPatternCollector.process(te1); + + if(StarQuery.isValidStarQuery(spList1)) { + StarQuery sq1 = new StarQuery(spList1); + return queryDocIndex(sq1, constraints); + } else { + throw new IllegalArgumentException("Invalid star query!"); + } + + } + + + + + @Override + public CloseableIteration<BindingSet, QueryEvaluationException> queryDocIndex(StarQuery query, + Collection<BindingSet> constraints) throws TableNotFoundException, QueryEvaluationException { + + final StarQuery starQ = query; + final Iterator<BindingSet> bs = constraints.iterator(); + final Iterator<BindingSet> bs2 = constraints.iterator(); + final Set<String> unCommonVarNames; + final Set<String> commonVarNames; + if (bs2.hasNext()) { + BindingSet currBs = bs2.next(); + commonVarNames = StarQuery.getCommonVars(query, currBs); + unCommonVarNames = Sets.difference(currBs.getBindingNames(), commonVarNames); + } else { + commonVarNames = Sets.newHashSet(); + unCommonVarNames = Sets.newHashSet(); + } + + if( commonVarNames.size() == 1 && !query.commonVarConstant() && commonVarNames.contains(query.getCommonVarName())) { + + final HashMultimap<String, BindingSet> map = HashMultimap.create(); + final String commonVar = starQ.getCommonVarName(); + final Iterator<Entry<Key, Value>> intersections; + final BatchScanner scan; + Set<Range> ranges = Sets.newHashSet(); + + while(bs.hasNext()) { + + BindingSet currentBs = bs.next(); + + if(currentBs.getBinding(commonVar) == null) { + continue; + } + + String row = currentBs.getBinding(commonVar).getValue().stringValue(); + ranges.add(new Range(row)); + map.put(row, currentBs); + + } + scan = runQuery(starQ, ranges); + intersections = scan.iterator(); + + + return new CloseableIteration<BindingSet, QueryEvaluationException>() { + + + private QueryBindingSet currentSolutionBs = null; + private boolean hasNextCalled = false; + private boolean isEmpty = false; + private Iterator<BindingSet> inputSet = (new ArrayList<BindingSet>()).iterator(); + private BindingSet currentBs; + private Key key; + + + + @Override + public boolean hasNext() throws QueryEvaluationException { + if (!hasNextCalled && !isEmpty) { + while (inputSet.hasNext() || intersections.hasNext()) { + if (!inputSet.hasNext()) { + key = intersections.next().getKey(); + inputSet = map.get(key.getRow().toString()).iterator(); + } + currentBs = inputSet.next(); + currentSolutionBs = deserializeKey(key, starQ, currentBs, unCommonVarNames); + + if (currentSolutionBs.size() == unCommonVarNames.size() + starQ.getUnCommonVars().size() +1) { + hasNextCalled = true; + return true; + } + + } + + isEmpty = true; + return false; + + } else if (isEmpty) { + return false; + } else { + return true; + } + + } + + + @Override + public BindingSet next() throws QueryEvaluationException { + + if (hasNextCalled) { + hasNextCalled = false; + } else if (isEmpty) { + throw new NoSuchElementException(); + } else { + if (this.hasNext()) { + hasNextCalled = false; + } else { + throw new NoSuchElementException(); + } + } + + return currentSolutionBs; + } + + @Override + public void remove() throws QueryEvaluationException { + throw new UnsupportedOperationException(); + } + + @Override + public void close() throws QueryEvaluationException { + scan.close(); + } + + }; + + + } else { + + return new CloseableIteration<BindingSet, QueryEvaluationException>() { + + @Override + public void remove() throws QueryEvaluationException { + throw new UnsupportedOperationException(); + } + + private Iterator<Entry<Key, Value>> intersections = null; + private QueryBindingSet currentSolutionBs = null; + private boolean hasNextCalled = false; + private boolean isEmpty = false; + private boolean init = false; + private BindingSet currentBs; + private StarQuery sq = new StarQuery(starQ); + private Set<Range> emptyRangeSet = Sets.newHashSet(); + private BatchScanner scan; + + @Override + public BindingSet next() throws QueryEvaluationException { + if (hasNextCalled) { + hasNextCalled = false; + } else if (isEmpty) { + throw new NoSuchElementException(); + } else { + if (this.hasNext()) { + hasNextCalled = false; + } else { + throw new NoSuchElementException(); + } + } + return currentSolutionBs; + } + + @Override + public boolean hasNext() throws QueryEvaluationException { + + if (!init) { + if (intersections == null && bs.hasNext()) { + currentBs = bs.next(); + sq = StarQuery.getConstrainedStarQuery(sq, currentBs); + scan = runQuery(sq,emptyRangeSet); + intersections = scan.iterator(); + // binding set empty + } else if (intersections == null && !bs.hasNext()) { + currentBs = new QueryBindingSet(); + scan = runQuery(starQ,emptyRangeSet); + intersections = scan.iterator(); + } + + init = true; + } + + if (!hasNextCalled && !isEmpty) { + while (intersections.hasNext() || bs.hasNext()) { + if (!intersections.hasNext()) { + scan.close(); + currentBs = bs.next(); + sq = StarQuery.getConstrainedStarQuery(sq, currentBs); + scan = runQuery(sq,emptyRangeSet); + intersections = scan.iterator(); + } + if (intersections.hasNext()) { + currentSolutionBs = deserializeKey(intersections.next().getKey(), sq, currentBs, + unCommonVarNames); + } else { + continue; + } + + if (sq.commonVarConstant() && currentSolutionBs.size() == unCommonVarNames.size() + sq.getUnCommonVars().size()) { + hasNextCalled = true; + return true; + } else if(currentSolutionBs.size() == unCommonVarNames.size() + sq.getUnCommonVars().size() + 1) { + hasNextCalled = true; + return true; + } + } + + isEmpty = true; + return false; + + } else if (isEmpty) { + return false; + } else { + return true; + } + } + + @Override + public void close() throws QueryEvaluationException { + scan.close(); + } + }; + } + } + + private QueryBindingSet deserializeKey(Key key, StarQuery sq, BindingSet currentBs, Set<String> unCommonVar) { + + + QueryBindingSet currentSolutionBs = new QueryBindingSet(); + + Text row = key.getRow(); + Text cq = key.getColumnQualifier(); + + + String[] cqArray = cq.toString().split(DocIndexIteratorUtil.DOC_ID_INDEX_DELIM); + + boolean commonVarSet = false; + + //if common Var is constant there is no common variable to assign a value to + if(sq.commonVarConstant()) { + commonVarSet = true; + } + + if (!commonVarSet && sq.isCommonVarURI()) { + RyaURI rURI = new RyaURI(row.toString()); + currentSolutionBs.addBinding(sq.getCommonVarName(), + RyaToRdfConversions.convertValue(rURI)); + commonVarSet = true; + } + + for (String s : sq.getUnCommonVars()) { + + byte[] cqBytes = cqArray[sq.getVarPos().get(s)].getBytes(); + int firstIndex = Bytes.indexOf(cqBytes, DELIM_BYTE); + int secondIndex = Bytes.lastIndexOf(cqBytes, DELIM_BYTE); + int typeIndex = Bytes.indexOf(cqBytes, TYPE_DELIM_BYTE); + byte[] tripleComponent = Arrays.copyOfRange(cqBytes, firstIndex + 1, secondIndex); + byte[] cqContent = Arrays.copyOfRange(cqBytes, secondIndex + 1, typeIndex); + byte[] objType = Arrays.copyOfRange(cqBytes, typeIndex, cqBytes.length); + + if ((new String(tripleComponent)).equals("object")) { + byte[] object = Bytes.concat(cqContent, objType); + org.openrdf.model.Value v = null; + try { + v = RyaToRdfConversions.convertValue(RyaContext.getInstance().deserialize( + object)); + } catch (RyaTypeResolverException e) { + e.printStackTrace(); + } + currentSolutionBs.addBinding(s, v); + + } else if ((new String(tripleComponent)).equals("subject")) { + if (!commonVarSet) { + byte[] object = Bytes.concat(row.getBytes(), objType); + org.openrdf.model.Value v = null; + try { + v = RyaToRdfConversions.convertValue(RyaContext.getInstance().deserialize( + object)); + } catch (RyaTypeResolverException e) { + e.printStackTrace(); + } + currentSolutionBs.addBinding(sq.getCommonVarName(), v); + commonVarSet = true; + } + RyaURI rURI = new RyaURI(new String(cqContent)); + currentSolutionBs.addBinding(s, RyaToRdfConversions.convertValue(rURI)); + } else { + throw new IllegalArgumentException("Invalid row."); + } + } + for (String s : unCommonVar) { + currentSolutionBs.addBinding(s, currentBs.getValue(s)); + } + return currentSolutionBs; + } + + private BatchScanner runQuery(StarQuery query, Collection<Range> ranges) throws QueryEvaluationException { + + try { + if (ranges.size() == 0) { + String rangeText = query.getCommonVarValue(); + Range r; + if (rangeText != null) { + r = new Range(new Text(query.getCommonVarValue())); + } else { + r = new Range(); + } + ranges = Collections.singleton(r); + } + + Connector accCon = ConfigUtils.getConnector(conf); + IteratorSetting is = new IteratorSetting(30, "fii", DocumentIndexIntersectingIterator.class); + + DocumentIndexIntersectingIterator.setColumnFamilies(is, query.getColumnCond()); + + if(query.hasContext()) { + DocumentIndexIntersectingIterator.setContext(is, query.getContextURI()); + } + bs = accCon.createBatchScanner(ConfigUtils.getEntityTableName(conf), + new Authorizations(conf.get(ConfigUtils.CLOUDBASE_AUTHS)), 15); + bs.addScanIterator(is); + bs.setRanges(ranges); + + return bs; + + } catch (TableNotFoundException e) { + e.printStackTrace(); + } catch (AccumuloException e) { + e.printStackTrace(); + } catch (AccumuloSecurityException e) { + e.printStackTrace(); + } + throw new QueryEvaluationException(); + } + + + @Override + public void close() throws IOException { + //TODO generate an exception when BS passed in -- scanner closed +// if (bs != null) { +// bs.close(); +// } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityCentricIndex.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityCentricIndex.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityCentricIndex.java new file mode 100644 index 0000000..2275d41 --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityCentricIndex.java @@ -0,0 +1,232 @@ +package mvm.rya.indexing.accumulo.entity; + +import static mvm.rya.accumulo.AccumuloRdfConstants.EMPTY_CV; +import static mvm.rya.accumulo.AccumuloRdfConstants.EMPTY_VALUE; +import static mvm.rya.api.RdfCloudTripleStoreConstants.DELIM_BYTES; +import static mvm.rya.api.RdfCloudTripleStoreConstants.EMPTY_BYTES; +import static mvm.rya.api.RdfCloudTripleStoreConstants.EMPTY_TEXT; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; + +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.accumulo.experimental.AbstractAccumuloIndexer; +import mvm.rya.accumulo.experimental.AccumuloIndexer; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.domain.RyaType; +import mvm.rya.api.domain.RyaURI; +import mvm.rya.api.resolver.RdfToRyaConversions; +import mvm.rya.api.resolver.RyaContext; +import mvm.rya.api.resolver.RyaTypeResolverException; +import mvm.rya.api.resolver.triple.TripleRow; +import mvm.rya.indexing.accumulo.ConfigUtils; +import mvm.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.MultiTableBatchWriter; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; +import org.openrdf.model.Statement; +import org.openrdf.query.algebra.evaluation.QueryOptimizer; +import org.openrdf.query.algebra.evaluation.impl.BindingAssigner; +import org.openrdf.query.algebra.evaluation.impl.CompareOptimizer; +import org.openrdf.query.algebra.evaluation.impl.ConjunctiveConstraintSplitter; +import org.openrdf.query.algebra.evaluation.impl.ConstantOptimizer; +import org.openrdf.query.algebra.evaluation.impl.DisjunctiveConstraintOptimizer; +import org.openrdf.query.algebra.evaluation.impl.FilterOptimizer; +import org.openrdf.query.algebra.evaluation.impl.IterativeEvaluationOptimizer; +import org.openrdf.query.algebra.evaluation.impl.OrderLimitOptimizer; +import org.openrdf.query.algebra.evaluation.impl.QueryModelNormalizer; +import org.openrdf.query.algebra.evaluation.impl.SameTermFilterOptimizer; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.primitives.Bytes; + +public class EntityCentricIndex extends AbstractAccumuloIndexer { + + private static final Logger logger = Logger.getLogger(EntityCentricIndex.class); + private static final String TABLE_SUFFIX = "EntityCentricIndex"; + + private AccumuloRdfConfiguration conf; + private BatchWriter writer; + private boolean isInit = false; + + public static final String CONF_TABLE_SUFFIX = "ac.indexer.eci.tablename"; + + + private void init() throws AccumuloException, AccumuloSecurityException, TableNotFoundException, IOException, + TableExistsException { + ConfigUtils.createTableIfNotExists(conf, ConfigUtils.getEntityTableName(conf)); + } + + + @Override + public Configuration getConf() { + return this.conf; + } + + //initialization occurs in setConf because index is created using reflection + @Override + public void setConf(Configuration conf) { + if (conf instanceof AccumuloRdfConfiguration) { + this.conf = (AccumuloRdfConfiguration) conf; + } else { + this.conf = new AccumuloRdfConfiguration(conf); + } + if (!isInit) { + try { + init(); + isInit = true; + } catch (AccumuloException e) { + logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); + throw new RuntimeException(e); + } catch (AccumuloSecurityException e) { + logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); + throw new RuntimeException(e); + } catch (TableNotFoundException e) { + logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); + throw new RuntimeException(e); + } catch (TableExistsException e) { + logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); + throw new RuntimeException(e); + } catch (IOException e) { + logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); + throw new RuntimeException(e); + } + } + } + + + @Override + public String getTableName() { + return ConfigUtils.getEntityTableName(conf); + } + + @Override + public void setMultiTableBatchWriter(MultiTableBatchWriter writer) throws IOException { + try { + this.writer = writer.getBatchWriter(getTableName()); + } catch (AccumuloException e) { + throw new IOException(e); + } catch (AccumuloSecurityException e) { + throw new IOException(e); + } catch (TableNotFoundException e) { + throw new IOException(e); + } + + } + + + public void storeStatement(RyaStatement stmt) throws IOException { + Preconditions.checkNotNull(writer, "BatchWriter not Set"); + try { + for (TripleRow row : serializeStatement(stmt)) { + writer.addMutation(createMutation(row)); + } + } catch (MutationsRejectedException e) { + throw new IOException(e); + } catch (RyaTypeResolverException e) { + throw new IOException(e); + } + } + + + public void deleteStatement(RyaStatement stmt) throws IOException { + Preconditions.checkNotNull(writer, "BatchWriter not Set"); + try { + for (TripleRow row : serializeStatement(stmt)) { + writer.addMutation(deleteMutation(row)); + } + } catch (MutationsRejectedException e) { + throw new IOException(e); + } catch (RyaTypeResolverException e) { + throw new IOException(e); + } + } + + + protected Mutation deleteMutation(TripleRow tripleRow) { + Mutation m = new Mutation(new Text(tripleRow.getRow())); + + byte[] columnFamily = tripleRow.getColumnFamily(); + Text cfText = columnFamily == null ? EMPTY_TEXT : new Text(columnFamily); + + byte[] columnQualifier = tripleRow.getColumnQualifier(); + Text cqText = columnQualifier == null ? EMPTY_TEXT : new Text(columnQualifier); + + m.putDelete(cfText, cqText, new ColumnVisibility(tripleRow.getColumnVisibility()), tripleRow.getTimestamp()); + return m; + } + + public static Collection<Mutation> createMutations(RyaStatement stmt) throws RyaTypeResolverException{ + Collection<Mutation> m = Lists.newArrayList(); + for (TripleRow tr : serializeStatement(stmt)){ + m.add(createMutation(tr)); + } + return m; + } + + private static Mutation createMutation(TripleRow tripleRow) { + Mutation mutation = new Mutation(new Text(tripleRow.getRow())); + byte[] columnVisibility = tripleRow.getColumnVisibility(); + ColumnVisibility cv = columnVisibility == null ? EMPTY_CV : new ColumnVisibility(columnVisibility); + Long timestamp = tripleRow.getTimestamp(); + byte[] value = tripleRow.getValue(); + Value v = value == null ? EMPTY_VALUE : new Value(value); + byte[] columnQualifier = tripleRow.getColumnQualifier(); + Text cqText = columnQualifier == null ? EMPTY_TEXT : new Text(columnQualifier); + byte[] columnFamily = tripleRow.getColumnFamily(); + Text cfText = columnFamily == null ? EMPTY_TEXT : new Text(columnFamily); + + mutation.put(cfText, cqText, cv, timestamp, v); + return mutation; + } + + private static List<TripleRow> serializeStatement(RyaStatement stmt) throws RyaTypeResolverException { + RyaURI subject = stmt.getSubject(); + RyaURI predicate = stmt.getPredicate(); + RyaType object = stmt.getObject(); + RyaURI context = stmt.getContext(); + Long timestamp = stmt.getTimestamp(); + byte[] columnVisibility = stmt.getColumnVisibility(); + byte[] value = stmt.getValue(); + assert subject != null && predicate != null && object != null; + byte[] cf = (context == null) ? EMPTY_BYTES : context.getData().getBytes(); + byte[] subjBytes = subject.getData().getBytes(); + byte[] predBytes = predicate.getData().getBytes(); + byte[][] objBytes = RyaContext.getInstance().serializeType(object); + + return Lists.newArrayList(new TripleRow(subjBytes, // + predBytes, // + Bytes.concat(cf, DELIM_BYTES, // + "object".getBytes(), DELIM_BYTES, // + objBytes[0], objBytes[1]), // + timestamp, // + columnVisibility, // + value// + ), + + new TripleRow(objBytes[0], // + predBytes, // + Bytes.concat(cf, DELIM_BYTES, // + "subject".getBytes(), DELIM_BYTES, // + subjBytes, objBytes[1]), // + timestamp, // + columnVisibility, // + value// + )); + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityLocalityGroupSetter.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityLocalityGroupSetter.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityLocalityGroupSetter.java new file mode 100644 index 0000000..0a9a91e --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityLocalityGroupSetter.java @@ -0,0 +1,151 @@ +package mvm.rya.indexing.accumulo.entity; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.NoSuchElementException; +import java.util.Set; +import mvm.rya.indexing.accumulo.ConfigUtils; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchScanner; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; + +public class EntityLocalityGroupSetter { + + + String tablePrefix; + Connector conn; + Configuration conf; + + public EntityLocalityGroupSetter(String tablePrefix, Connector conn, Configuration conf) { + this.conn = conn; + this.tablePrefix = tablePrefix; + this.conf = conf; + } + + + + private Iterator<String> getPredicates() { + + String auths = conf.get(ConfigUtils.CLOUDBASE_AUTHS); + BatchScanner bs = null; + try { + bs = conn.createBatchScanner(tablePrefix + "prospects", new Authorizations(auths), 10); + } catch (TableNotFoundException e) { + e.printStackTrace(); + } + bs.setRanges(Collections.singleton(Range.prefix(new Text("predicate" + "\u0000")))); + final Iterator<Entry<Key,Value>> iter = bs.iterator(); + + return new Iterator<String>() { + + private String next = null; + private boolean hasNextCalled = false; + private boolean isEmpty = false; + + @Override + public boolean hasNext() { + + if (!hasNextCalled && !isEmpty) { + while (iter.hasNext()) { + Entry<Key,Value> temp = iter.next(); + String row = temp.getKey().getRow().toString(); + String[] rowArray = row.split("\u0000"); + next = rowArray[1]; + + hasNextCalled = true; + return true; + } + isEmpty = true; + return false; + } else if(isEmpty) { + return false; + }else { + return true; + } + } + + @Override + public String next() { + + if (hasNextCalled) { + hasNextCalled = false; + return next; + } else if(isEmpty) { + throw new NoSuchElementException(); + }else { + if (this.hasNext()) { + hasNextCalled = false; + return next; + } else { + throw new NoSuchElementException(); + } + } + } + + @Override + public void remove() { + + throw new UnsupportedOperationException("Cannot delete from iterator!"); + + } + + }; + } + + + + + + + + + public void setLocalityGroups() { + + HashMap<String, Set<Text>> localityGroups = new HashMap<String, Set<Text>>(); + Iterator<String> groups = getPredicates(); + + int i = 1; + + while(groups.hasNext()) { + HashSet<Text> tempColumn = new HashSet<Text>(); + String temp = groups.next(); + tempColumn.add(new Text(temp)); + String groupName = "predicate" + i; + localityGroups.put(groupName, tempColumn); + i++; + } + + + try { + conn.tableOperations().setLocalityGroups(tablePrefix + "doc_partitioned_index", localityGroups); + //conn.tableOperations().compact(tablePrefix + "doc_partitioned_index", null, null, true, true); + } catch (AccumuloException e) { + e.printStackTrace(); + } catch (AccumuloSecurityException e) { + e.printStackTrace(); + } catch (TableNotFoundException e) { + e.printStackTrace(); + } + + + + } + + + + + + + +}
