http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java deleted file mode 100644 index e2f98b3..0000000 --- a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java +++ /dev/null @@ -1,824 +0,0 @@ -package mvm.rya.indexing.accumulo.temporal; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -import info.aduna.iteration.CloseableIteration; - -import java.io.IOException; -import java.nio.charset.CharacterCodingException; -import java.util.Collection; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map.Entry; -import java.util.NoSuchElementException; -import java.util.Set; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import javax.xml.datatype.XMLGregorianCalendar; - -import mvm.rya.accumulo.AccumuloRdfConfiguration; -import mvm.rya.accumulo.experimental.AbstractAccumuloIndexer; -import mvm.rya.accumulo.experimental.AccumuloIndexer; -import mvm.rya.api.RdfCloudTripleStoreConfiguration; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.domain.RyaURI; -import mvm.rya.api.resolver.RyaToRdfConversions; -import mvm.rya.indexing.KeyParts; -import mvm.rya.indexing.StatementContraints; -import mvm.rya.indexing.TemporalIndexer; -import mvm.rya.indexing.TemporalInstant; -import mvm.rya.indexing.TemporalInterval; -import mvm.rya.indexing.accumulo.ConfigUtils; -import mvm.rya.indexing.accumulo.StatementSerializer; - -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.BatchScanner; -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.MultiTableBatchWriter; -import org.apache.accumulo.core.client.MutationsRejectedException; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.ScannerBase; -import org.apache.accumulo.core.client.TableExistsException; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.commons.codec.binary.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Text; -import org.apache.log4j.Logger; -import org.joda.time.DateTime; -import org.openrdf.model.Literal; -import org.openrdf.model.Resource; -import org.openrdf.model.Statement; -import org.openrdf.model.URI; -import org.openrdf.query.QueryEvaluationException; - -import cern.colt.Arrays; - -public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements TemporalIndexer { - - private static final Logger logger = Logger.getLogger(AccumuloTemporalIndexer.class); - - private static final String CF_INTERVAL = "interval"; - - - - // Delimiter used in the interval stored in the triple's object literal. - // So far, no ontology specifies a date range, just instants. - // Set to the same delimiter used by the indexer, probably needs revisiting. - //private static final String REGEX_intervalDelimiter = TemporalInterval.DELIMITER; - - private Configuration conf; - - private MultiTableBatchWriter mtbw; - - private BatchWriter temporalIndexBatchWriter; - - private Set<URI> validPredicates; - private String temporalIndexTableName; - - private boolean isInit = false; - - - - private void init() throws AccumuloException, AccumuloSecurityException, TableNotFoundException, - TableExistsException { - temporalIndexTableName = ConfigUtils.getTemporalTableName(conf); - // Create one index table on first run. - ConfigUtils.createTableIfNotExists(conf, temporalIndexTableName); - - mtbw = ConfigUtils.createMultitableBatchWriter(conf); - - temporalIndexBatchWriter = mtbw.getBatchWriter(temporalIndexTableName); - - validPredicates = ConfigUtils.getTemporalPredicates(conf); - } - - //initialization occurs in setConf because index is created using reflection - @Override - public void setConf(Configuration conf) { - this.conf = conf; - if (!isInit) { - try { - init(); - isInit = true; - } catch (AccumuloException e) { - logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); - throw new RuntimeException(e); - } catch (AccumuloSecurityException e) { - logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); - throw new RuntimeException(e); - } catch (TableNotFoundException e) { - logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); - throw new RuntimeException(e); - } catch (TableExistsException e) { - logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); - throw new RuntimeException(e); - } - } - } - - @Override - public Configuration getConf() { - return this.conf; - } - - - /** - * Store a statement in the index if it meets the criterion: Object should be - * a literal and one of the validPredicates from the configuration. - * If it does not meet the criteria, it is silently ignored. - * logs a warning if the object is not parse-able. - * Attempts to parse with calendarValue = literalValue.calendarValue() - * if that fails, tries: org.joda.time.DateTime.parse() . - * T O D O parse an interval using multiple predicates for same subject -- ontology dependent. - */ - private void storeStatement(Statement statement) throws IOException, IllegalArgumentException { - // if the predicate list is empty, accept all predicates. - // Otherwise, make sure the predicate is on the "valid" list - boolean isValidPredicate = validPredicates.isEmpty() || validPredicates.contains(statement.getPredicate()); - if (!isValidPredicate || !(statement.getObject() instanceof Literal)) - return; - DateTime[] indexDateTimes = new DateTime[2]; // 0 begin, 1 end of interval - extractDateTime(statement, indexDateTimes); - if (indexDateTimes[0]==null) - return; - - // Add this as an instant, or interval. - try { - if (indexDateTimes[1] != null) { - TemporalInterval interval = new TemporalInterval(new TemporalInstantRfc3339(indexDateTimes[0]), new TemporalInstantRfc3339(indexDateTimes[1])); - addInterval(temporalIndexBatchWriter, interval, statement); - } else { - TemporalInstant instant = new TemporalInstantRfc3339(indexDateTimes[0]); - addInstant(temporalIndexBatchWriter, instant, statement); - } - } catch (MutationsRejectedException e) { - throw new IOException("While adding interval/instant for statement =" + statement, e); - } - } - - - @Override - public void storeStatement(RyaStatement statement) throws IllegalArgumentException, IOException { - storeStatement(RyaToRdfConversions.convertStatement(statement)); - } - - - - /** - * parse the literal dates from the object of a statement. - * - * @param statement - * @param outputDateTimes - */ - private void extractDateTime(Statement statement, DateTime[] outputDateTimes) { - if (!(statement.getObject() instanceof Literal)) // Error since it should already be tested by caller. - throw new RuntimeException("Statement's object must be a literal: " + statement); - // throws IllegalArgumentException NumberFormatException if can't parse - String logThis = null; Literal literalValue = (Literal) statement.getObject(); - // First attempt to parse a interval in the form "[date1,date2]" - Matcher matcher = Pattern.compile("\\[(.*)\\,(.*)\\].*").matcher(literalValue.stringValue()); - if (matcher.find()) { - try { - // Got a datetime pair, parse into an interval. - outputDateTimes[0] = new DateTime(matcher.group(1)); - outputDateTimes[1] = new DateTime(matcher.group(2)); - return; - } catch (java.lang.IllegalArgumentException e) { - logThis = e.getMessage() + " " + logThis; - outputDateTimes[0]=null; - outputDateTimes[1]=null; - } - } - - try { - XMLGregorianCalendar calendarValue = literalValue.calendarValue(); - outputDateTimes[0] = new DateTime(calendarValue.toGregorianCalendar()); - outputDateTimes[1] = null; - return; - } catch (java.lang.IllegalArgumentException e) { - logThis = e.getMessage(); - } - // Try again using Joda Time DateTime.parse() - try { - outputDateTimes[0] = DateTime.parse(literalValue.stringValue()); - outputDateTimes[1] = null; - //System.out.println(">>>>>>>Joda parsed: "+literalValue.stringValue()); - return; - } catch (java.lang.IllegalArgumentException e) { - logThis = e.getMessage() + " " + logThis; - } - logger.warn("TemporalIndexer is unable to parse the date/time from statement=" + statement.toString() + " " +logThis); - return; - } - - /** - * Index a new interval - * TODO: integrate into KeyParts (or eliminate) - * @param writer - * @param cv - * @param interval - * @throws MutationsRejectedException - */ - public void addInterval(BatchWriter writer, TemporalInterval interval, Statement statement) throws MutationsRejectedException { - - Value statementValue = new Value(StringUtils.getBytesUtf8(StatementSerializer.writeStatement(statement))); - Text cf = new Text(StatementSerializer.writeContext(statement)); - Text cqBegin = new Text(KeyParts.CQ_BEGIN); - Text cqEnd = new Text(KeyParts.CQ_END); - - // Start Begin index - Text keyText =new Text(interval.getAsKeyBeginning()); - KeyParts.appendUniqueness(statement, keyText); - Mutation m = new Mutation(keyText); - m.put(cf, cqBegin, statementValue); - // System.out.println("mutations add begin row=" + m.getRow() + " value=" + value.toString()); - writer.addMutation(m); - - // now the end index: - keyText = new Text(interval.getAsKeyEnd()); - KeyParts.appendUniqueness(statement, keyText); - m = new Mutation(keyText); - m.put(cf, cqEnd, new Value(statementValue)); - // System.out.println("mutations add end row=" + m.getRow() + " value=" + value.toString()); - writer.addMutation(m); - } - - - /** - * Index a new interval - * Make indexes that handle this expression: - * hash( s? p? ) ?o - * == o union hash(s)o union hash(p)o union hash(sp)o - * - * @param writer - * @param cv - * @param instant - * @throws MutationsRejectedException - */ - public void addInstant(BatchWriter writer, TemporalInstant instant, Statement statement) throws MutationsRejectedException { - KeyParts keyParts = new KeyParts(statement, instant); - for (KeyParts k: keyParts) { - Mutation m = new Mutation(k.getStoreKey()); - m.put(k.cf, k.cq,k.getValue()); - writer.addMutation(m); - } - } - - - /** - * creates a scanner and handles all the throwables and nulls. - * - * @param scanner - * @return - * @throws IOException - */ - private Scanner getScanner() throws QueryEvaluationException { - String whileDoing = "While creating a scanner for a temporal query. table name=" + temporalIndexTableName; - Scanner scanner = null; - try { - scanner = ConfigUtils.createScanner(temporalIndexTableName, conf); - } catch (AccumuloException e) { - logger.error(whileDoing, e); - throw new QueryEvaluationException(whileDoing, e); - } catch (AccumuloSecurityException e) { - throw new QueryEvaluationException(whileDoing, e); - } catch (TableNotFoundException e) { - logger.error(whileDoing, e); - throw new QueryEvaluationException(whileDoing - + " The temporal index table should have been created by this constructor, if found missing.", e); - } - return scanner; - } - - private BatchScanner getBatchScanner() throws QueryEvaluationException { - String whileDoing = "While creating a Batch scanner for a temporal query. table name=" + temporalIndexTableName; - try { - return ConfigUtils.createBatchScanner(temporalIndexTableName, conf); - } catch (AccumuloException e) { - logger.error(whileDoing, e); - throw new QueryEvaluationException(whileDoing, e); - } catch (AccumuloSecurityException e) { - throw new QueryEvaluationException(whileDoing, e); - } catch (TableNotFoundException e) { - logger.error(whileDoing, e); - throw new QueryEvaluationException(whileDoing - + " The temporal index table should have been created by this constructor, if found missing. ", e); - } - } - - - /** - * statements where the datetime is exactly the same as the queryInstant. - */ - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryInstantEqualsInstant( - TemporalInstant queryInstant, StatementContraints constraints) - throws QueryEvaluationException { - // get rows where the repository time is equal to the given time in queryInstant. - Query query = new Query() { - @Override - public Range getRange(KeyParts keyParts) { - //System.out.println("Scanning queryInstantEqualsInstant: prefix:" + KeyParts.toHumanString(keyParts.getQueryKey())); - return Range.prefix(keyParts.getQueryKey()); // <-- specific logic - } - }; - ScannerBase scanner = query.doQuery(queryInstant, constraints); - // TODO currently context constraints are filtered on the client. - return getContextIteratorWrapper(scanner, constraints.getContext()); - } - - /** - * get statements where the db row ID is BEFORE the given queryInstant. - */ - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryInstantBeforeInstant( - TemporalInstant queryInstant, StatementContraints constraints) - throws QueryEvaluationException { - // get rows where the repository time is before the given time. - Query query = new Query() { - @Override - public Range getRange(KeyParts keyParts) { - Text start= null; - if (keyParts.constraintPrefix != null ) // Yes, has constraints - start = keyParts.constraintPrefix; // <-- start specific logic - else - start = new Text(KeyParts.HASH_PREFIX_FOLLOWING); - Text endAt = keyParts.getQueryKey(); // <-- end specific logic - //System.out.println("Scanning queryInstantBeforeInstant: from:" + KeyParts.toHumanString(start) + " up to:" + KeyParts.toHumanString(endAt)); - return new Range(start, true, endAt, false); - } - }; - ScannerBase scanner = query.doQuery(queryInstant, constraints); - return getContextIteratorWrapper(scanner, constraints.getContext()); - } - - /** - * get statements where the date object is after the given queryInstant. - */ - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryInstantAfterInstant( - TemporalInstant queryInstant, StatementContraints constraints) - throws QueryEvaluationException { - Query query = new Query() { - @Override - public Range getRange(KeyParts keyParts) { - Text start = Range.followingPrefix(keyParts.getQueryKey()); // <-- specific logic - Text endAt = null; // no constraints // <-- specific logic - if (keyParts.constraintPrefix != null ) // Yes, has constraints - endAt = Range.followingPrefix(keyParts.constraintPrefix); - //System.out.println("Scanning queryInstantAfterInstant from after:" + KeyParts.toHumanString(start) + " up to:" + KeyParts.toHumanString(endAt)); - return new Range(start, true, endAt, false); - } - }; - ScannerBase scanner = query.doQuery(queryInstant, constraints); - return getContextIteratorWrapper(scanner, constraints.getContext()); - } - - /** - * Get instances before a given interval. Returns queryInstantBeforeInstant with the interval's beginning time. - */ - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryInstantBeforeInterval( - TemporalInterval givenInterval, StatementContraints contraints) - throws QueryEvaluationException { - return queryInstantBeforeInstant(givenInterval.getHasBeginning(), contraints); - } - - /** - * Get instances after a given interval. Returns queryInstantAfterInstant with the interval's end time. - */ - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryInstantAfterInterval( - TemporalInterval givenInterval, StatementContraints contraints) throws QueryEvaluationException { - return queryInstantAfterInstant(givenInterval.getHasEnd(), contraints); - } - - /** - * Get instances inside a given interval. - * Returns after interval's beginning time, and before ending time, - * exclusive (don't match the beginning and ending). - */ - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryInstantInsideInterval( - TemporalInterval queryInterval, StatementContraints constraints) - throws QueryEvaluationException { - // get rows where the time is after the given interval's beginning time and before the ending time. - final TemporalInterval theQueryInterval = queryInterval; - Query query = new Query() { - private final TemporalInterval queryInterval = theQueryInterval; - @Override - public Range getRange(KeyParts keyParts) { - Text start = Range.followingPrefix(new Text(keyParts.getQueryKey(queryInterval.getHasBeginning()))); - Text endAt = new Text(keyParts.getQueryKey(queryInterval.getHasEnd())); // <-- end specific logic - //System.out.println("Scanning queryInstantInsideInterval: from excluding:" + KeyParts.toHumanString(start) + " up to:" + KeyParts.toHumanString(endAt)); - return new Range(start, false, endAt, false); - } - }; - ScannerBase scanner = query.doQuery(queryInterval.getHasBeginning(), constraints); - return getContextIteratorWrapper(scanner, constraints.getContext()); - } - /** - * Get instances matching the beginning of a given interval. - */ - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryInstantHasBeginningInterval( - TemporalInterval queryInterval, StatementContraints contraints) - throws QueryEvaluationException { - return queryInstantEqualsInstant(queryInterval.getHasBeginning(), contraints); - } - - /** - * Get instances matching the ending of a given interval. - */ - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryInstantHasEndInterval( - TemporalInterval queryInterval, StatementContraints contraints) - throws QueryEvaluationException { - return queryInstantEqualsInstant(queryInterval.getHasEnd(), contraints); - } - - /** - * Get intervals stored in the repository matching the given interval. - * Indexing Intervals will probably change or be removed. - * Currently predicate and subject constraints are filtered on the client. - */ - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryIntervalEquals( - TemporalInterval query, StatementContraints contraints) - throws QueryEvaluationException { - Scanner scanner = getScanner(); - if (scanner != null) { - // get rows where the start and end match. - Range range = Range.prefix(new Text(query.getAsKeyBeginning())); - scanner.setRange(range); - if (contraints.hasContext()) - scanner.fetchColumn(new Text(contraints.getContext().toString()), new Text(KeyParts.CQ_BEGIN)); - else - scanner.fetchColumn(new Text(""), new Text(KeyParts.CQ_BEGIN)); - } - // Iterator<Entry<Key, Value>> iter = scanner.iterator(); - // while (iter.hasNext()) { - // System.out.println("queryIntervalEquals results:"+iter.next()); - // } - //return getConstrainedIteratorWrapper(scanner, contraints); - return getIteratorWrapper(scanner); - } - - /** - * find intervals stored in the repository before the given Interval. Find interval endings that are - * before the given beginning. - * Indexing Intervals will probably change or be removed. - * Currently predicate and subject constraints are filtered on the client. - */ - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryIntervalBefore( - TemporalInterval queryInterval, StatementContraints constraints) throws QueryEvaluationException - { - Scanner scanner = getScanner(); - if (scanner != null) { - // get rows where the end date is less than the queryInterval.getBefore() - Range range = new Range(null, false, new Key(new Text(queryInterval.getHasBeginning().getAsKeyBytes())), false); - scanner.setRange(range); - if (constraints.hasContext()) - scanner.fetchColumn(new Text(constraints.getContext().toString()), new Text(KeyParts.CQ_END)); - else - scanner.fetchColumn(new Text(""), new Text(KeyParts.CQ_END)); - } - return getIteratorWrapper(scanner); - } - - /** - * Interval after given interval. Find intervals that begin after the endings of the given interval. - * Use the special following prefix mechanism to avoid matching the beginning date. - * Indexing Intervals will probably change or be removed. - * Currently predicate and subject and context constraints are filtered on the client. - */ - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryIntervalAfter( - TemporalInterval queryInterval, StatementContraints constraints) - throws QueryEvaluationException { - - Scanner scanner = getScanner(); - if (scanner != null) { - // get rows where the start date is greater than the queryInterval.getEnd() - Range range = new Range(new Key(Range.followingPrefix(new Text(queryInterval.getHasEnd().getAsKeyBytes()))), false, null, true); - scanner.setRange(range); - - if (constraints.hasContext()) - scanner.fetchColumn(new Text(constraints.getContext().toString()), new Text(KeyParts.CQ_BEGIN)); - else - scanner.fetchColumn(new Text(""), new Text(KeyParts.CQ_BEGIN)); - } - // TODO currently predicate, subject and context constraints are filtered on the clients - return getIteratorWrapper(scanner); - } - // -- - // -- END of Query functions. Next up, general stuff used by the queries above. - // -- - - /** - * Allows passing range specific logic into doQuery. - * Each query function implements an anonymous instance of this and calls it's doQuery(). - */ - abstract class Query { - abstract protected Range getRange(KeyParts keyParts); - - public ScannerBase doQuery(TemporalInstant queryInstant, StatementContraints constraints) throws QueryEvaluationException { - // key is contraintPrefix + time, or just time. - // Any constraints handled here, if the constraints are empty, the - // thisKeyParts.contraintPrefix will be null. - List<KeyParts> keyParts = KeyParts.keyPartsForQuery(queryInstant, constraints); - ScannerBase scanner = null; - if (keyParts.size() > 1) - scanner = getBatchScanner(); - else - scanner = getScanner(); - - Collection<Range> ranges = new HashSet<Range>(); - KeyParts lastKeyParts = null; - Range range = null; - for (KeyParts thisKeyParts : keyParts) { - range = this.getRange(thisKeyParts); - ranges.add(range); - lastKeyParts = thisKeyParts; - } - //System.out.println("Scanning columns, cf:" + lastKeyParts.cf + "CQ:" + lastKeyParts.cq); - scanner.fetchColumn(new Text(lastKeyParts.cf), new Text(lastKeyParts.cq)); - if (scanner instanceof BatchScanner) - ((BatchScanner) scanner).setRanges(ranges); - else if (range != null) - ((Scanner) scanner).setRange(range); - return scanner; - } - } - - /** - * An iteration wrapper for a loaded scanner that is returned for each query above. - * - * @param scanner - * the results to iterate, then close. - * @return an anonymous object that will iterate the resulting statements from a given scanner. - */ - private static CloseableIteration<Statement, QueryEvaluationException> getIteratorWrapper(final ScannerBase scanner) { - - final Iterator<Entry<Key, Value>> i = scanner.iterator(); - - return new CloseableIteration<Statement, QueryEvaluationException>() { - @Override - public boolean hasNext() { - return i.hasNext(); - } - - @Override - public Statement next() throws QueryEvaluationException { - Entry<Key, Value> entry = i.next(); - Value v = entry.getValue(); - try { - String dataString = Text.decode(v.get(), 0, v.getSize()); - Statement s = StatementSerializer.readStatement(dataString); - return s; - } catch (CharacterCodingException e) { - logger.error("Error decoding value=" + Arrays.toString(v.get()), e); - throw new QueryEvaluationException(e); - } catch (IOException e) { - logger.error("Error de-serializing statement, string=" + v.get(), e); - throw new QueryEvaluationException(e); - } - } - - @Override - public void remove() { - throw new UnsupportedOperationException("Remove not implemented"); - } - - @Override - public void close() throws QueryEvaluationException { - scanner.close(); - } - }; - } - - - /** - * An iteration wrapper for a loaded scanner that is returned for partially supported interval queries above. - * - * @param scanner the results to iterate, then close. - * @param constraints limit statements returned by next() to those matching the constraints. - * @return an anonymous object that will iterate the resulting statements from a given scanner. - * @throws QueryEvaluationException - */ - private static CloseableIteration<Statement, QueryEvaluationException> getConstrainedIteratorWrapper(final Scanner scanner, final StatementContraints constraints) { - if (!constraints.hasContext() && !constraints.hasSubject() && !constraints.hasPredicates()) - return getIteratorWrapper(scanner); - return new ConstrainedIteratorWrapper(scanner) { - @Override - public boolean allowedBy(Statement statement) { - return allowedByConstraints(statement, constraints); - } - }; - } - /** - * An iteration wrapper for a loaded scanner that is returned for queries above. - * Currently, this temporal index supports contexts only on the client, using this filter. - * - * @param scanner the results to iterate, then close. - * @param constraints limit statements returned by next() to those matching the constraints. - * @return an anonymous object that will iterate the resulting statements from a given scanner. - * @throws QueryEvaluationException - */ - private static CloseableIteration<Statement, QueryEvaluationException> getContextIteratorWrapper(final ScannerBase scanner, final Resource context) { - if (context==null) - return getIteratorWrapper(scanner); - return new ConstrainedIteratorWrapper(scanner) { - @Override - public boolean allowedBy(Statement statement) { - return allowedByContext(statement, context); - } - }; - } - /** - * Wrap a scanner in a iterator that will filter statements based on a boolean allowedBy(). - * If the allowedBy function returns false for the next statement, it is skipped. - * This is used for to do client side, what the index cannot (yet) do on the server side. - */ - abstract static class ConstrainedIteratorWrapper implements CloseableIteration<Statement, QueryEvaluationException> { - private Statement nextStatement=null; - private boolean isInitialized = false; - final private Iterator<Entry<Key, Value>> i; - final private ScannerBase scanner; - - ConstrainedIteratorWrapper(ScannerBase scanner) { - this.scanner = scanner; - i=scanner.iterator(); - } - @Override - public boolean hasNext() throws QueryEvaluationException { - if (!isInitialized) - internalGetNext(); - return (nextStatement != null) ; - } - - @Override - public Statement next() throws QueryEvaluationException { - if (nextStatement==null) { - if (!isInitialized) - internalGetNext(); - if (nextStatement==null) - throw new NoSuchElementException(); - } - // use this one, then get the next one loaded. - Statement thisStatement = this.nextStatement; - internalGetNext(); - return thisStatement; - } - - /** - * Gets the next statement meeting constraints and stores in nextStatement. - * Sets null when all done, or on exception. - * @throws QueryEvaluationException - */ - private void internalGetNext() - throws QueryEvaluationException { - isInitialized=true; - this.nextStatement = null; // Default on done or error. - Statement statement = null; - while (i.hasNext()) { - Entry<Key, Value> entry = i.next(); - Value v = entry.getValue(); - try { - String dataString = Text.decode(v.get(), 0, v.getSize()); - statement = StatementSerializer.readStatement(dataString); - } catch (CharacterCodingException e) { - logger.error("Error decoding value=" + Arrays.toString(v.get()), e); - throw new QueryEvaluationException(e); - } catch (IOException e) { - logger.error("Error de-serializing statement, string=" + v.get(), e); - throw new QueryEvaluationException(e); - } - if (allowedBy(statement)) { - this.nextStatement = statement; - return; - } - } - } - public abstract boolean allowedBy(Statement s); - - @Override - public void remove() { - throw new UnsupportedOperationException("Remove not implemented"); - } - - @Override - public void close() throws QueryEvaluationException { - scanner.close(); - } - } - - /** - * Does the statement meet the constraints? Match predicate, subject, and context. - * @param statement Candidate statement to be allowed or not. - * @param contraints fields that are non-null must match the statement's components, otherwise it is not allowed. - * @return true if the parts of the statement match the statementConstraints' parts. - */ - protected static boolean allowedByConstraints(Statement statement, StatementContraints constraints) { - - if (constraints.hasSubject() && ! constraints.getSubject().toString().equals(statement.getSubject().toString())) - {System.out.println("Constrain subject: "+constraints.getSubject()+" != " + statement.getSubject()); return false;} - //return false; - - if (! allowedByContext(statement, constraints.getContext())) - return false; - //{System.out.println("Constrain context: "+constraints.getContext()+" != " + statement.getContext()); return false;} - - if (constraints.hasPredicates() && ! constraints.getPredicates().contains(statement.getPredicate())) - return false; - //{System.out.println("Constrain predicate: "+constraints.getPredicates()+" != " + statement.getPredicate()); return false;} - - System.out.println("allow statement: "+ statement.toString()); - return true; - } - - /** - * Allow only if the context matches the statement. This is a client side filter. - * @param statement - * @param context - * @return - */ - protected static boolean allowedByContext(Statement statement, Resource context) { - return context==null || context.equals( statement.getContext() ); - } - - @Override - public Set<URI> getIndexablePredicates() { - - return validPredicates; - } - - /** - * Flush the data to the batchwriter. - * Throws a IOException as required by the flushable interface, - * wrapping MutationsRejectedException. - */ - @Override - public void flush() throws IOException { - try { - mtbw.flush(); - } catch (MutationsRejectedException e) { - String msg = "Error while flushing the batch writer."; - logger.error(msg, e); - throw new IOException(msg, e); - } - } - - /** - * Close batchwriter. - * Throws a IOException as required by the flushable interface, - * wrapping MutationsRejectedException. - */ - @Override - public void close() throws IOException { - try { - - mtbw.close(); - - } catch (MutationsRejectedException e) { - String msg = "Error while closing the batch writer."; - logger.error(msg, e); - throw new IOException(msg, e); - } - } - - - - @Override - public String getTableName() { - return ConfigUtils.getTemporalTableName(conf); - } - - -}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/temporal/TemporalInstantRfc3339.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/temporal/TemporalInstantRfc3339.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/temporal/TemporalInstantRfc3339.java deleted file mode 100644 index a69a79f..0000000 --- a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/temporal/TemporalInstantRfc3339.java +++ /dev/null @@ -1,218 +0,0 @@ -/** - * - */ -package mvm.rya.indexing.accumulo.temporal; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import mvm.rya.indexing.TemporalInstant; -import mvm.rya.indexing.TemporalInterval; - -import org.apache.commons.codec.binary.StringUtils; -import org.joda.time.DateTime; -import org.joda.time.DateTimeZone; -import org.joda.time.format.DateTimeFormatter; -import org.joda.time.format.ISODateTimeFormat; - -/** - * Immutable date and time instance returning a human readable key. - * Preserves the Time zone, but not stored in the key. - * Converts fields (hours, etc) correctly for tz=Zulu when stored, - * so the original timezone is not preserved when retrieved. - * - * Uses rfc 3339, which looks like: YYYY-MM-DDThh:mm:ssZ a subset - * of ISO-8601 : https://www.ietf.org/rfc/rfc3339.txt - * - * Limits: All dates and times are assumed to be in the "current era", no BC, - * somewhere between 0000AD and 9999AD. - * - * Resolution: to the second, or millisecond if the optional fraction is used. - * - * This is really a wrapper for Joda DateTime. if you need functionality from - * that wonderful class, simply use t.getAsDateTime(). - * - */ -public class TemporalInstantRfc3339 implements TemporalInstant { - - private static final long serialVersionUID = -7790000399142290309L; - - private final DateTime dateTime; - /** - * Format key like this: YYYY-MM-DDThh:mm:ssZ - */ - public final static DateTimeFormatter FORMATTER = ISODateTimeFormat.dateTimeNoMillis(); - - /** - * New date assumed UTC time zone. - * - * @param year - * @param month - * @param day - * @param hour - * @param minute - * @param second - */ - public TemporalInstantRfc3339(int year, int month, int day, int hour, int minute, int second) { - dateTime = new DateTime(year, month, day, hour, minute, second, DateTimeZone.UTC); - } - - /** - * Construct with a Joda/java v8 DateTime; - * TZ is preserved, but not in the key. - * - * @param dateTime - * initialize with this date time. Converted to zulu time zone for key generation. - * @return - */ - public TemporalInstantRfc3339(DateTime datetime) { - this.dateTime = datetime; - } - /** - * Get an interval setting beginning and end with this implementation of {@link TemporalInstant}. - * beginning must be less than end. - * - * @param dateTimeInterval String in the form [dateTime1,dateTime2] - */ - public static TemporalInterval parseInterval(String dateTimeInterval) { - - Matcher matcher = Pattern.compile("\\[(.*)\\,(.*)\\].*").matcher(dateTimeInterval); - if (matcher.find()) { - // Got a date time pair, parse into an interval. - return new TemporalInterval( - new TemporalInstantRfc3339(new DateTime(matcher.group(1))), - new TemporalInstantRfc3339(new DateTime(matcher.group(2)))); - } - throw new IllegalArgumentException("Can't parse interval, expecting '[ISO8601dateTime1,ISO8601dateTime2]', actual: "+dateTimeInterval); - } - - /** - * if this is older returns -1, equal 0, else 1 - * - */ - @Override - public int compareTo(TemporalInstant that) { - return this.getAsKeyString().compareTo(that.getAsKeyString()); - } - - @Override - public byte[] getAsKeyBytes() { - return StringUtils.getBytesUtf8(getAsKeyString()); - } - - @Override - public String getAsKeyString() { - return dateTime.withZone(DateTimeZone.UTC).toString(FORMATTER); - } - - /** - * Readable string, formated local time at {@link DateTimeZone}. - * If the timezone is UTC (Z), it was probably a key from the database. - * If the server and client are in different Time zone, should probably use the client timezone. - * - * Time at specified time zone: - * instant.getAsReadable(DateTimeZone.forID("-05:00"))); - * instant.getAsReadable(DateTimeZone.getDefault())); - * - * Use original time zone set in the constructor: - * instant.getAsDateTime().toString(TemporalInstantRfc3339.FORMATTER)); - * - */ - @Override - public String getAsReadable(DateTimeZone dateTimeZone) { - return dateTime.withZone(dateTimeZone).toString(FORMATTER); - } - - /** - * Use original time zone set in the constructor, or UTC if from parsing the key. - */ - @Override - public String getAsReadable() { - return dateTime.toString(FORMATTER); - } - - /** - * default toString, same as getAsReadable(). - */ - @Override - public String toString() { - return getAsReadable(); - } - - /** - * Show readable time converted to the default timezone. - */ - @Override - public DateTime getAsDateTime() { - return dateTime; - } - - /** - * Minimum Date, used for infinitely past. - */ - private static final TemporalInstant MINIMUM = new TemporalInstantRfc3339(new DateTime(Long.MIN_VALUE)); - /** - * maximum date/time is used for infinitely in the future. - */ - private static final TemporalInstant MAXIMUM = new TemporalInstantRfc3339(new DateTime(Long.MAX_VALUE)); - - /** - * infinite past date. - * @return an instant that will compare as NEWER than anything but itself. - */ - public static TemporalInstant getMinimumInstance() { - return MINIMUM; - } - /** - * infinite future date. - * @return an instant that will compare as OLDER than anything but itself - */ - - public static TemporalInstant getMaximumInstance() { - return MAXIMUM; - } - - /* (non-Javadoc) - * @see java.lang.Object#hashCode() - */ - @Override - public int hashCode() { - return this.getAsKeyString().hashCode(); - } - - /* (non-Javadoc) - * @see java.lang.Object#equals(java.lang.Object) - */ - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - TemporalInstantRfc3339 other = (TemporalInstantRfc3339) obj; - return (this.getAsKeyString().equals(other.getAsKeyString())); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/temporal/TemporalTupleSet.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/temporal/TemporalTupleSet.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/temporal/TemporalTupleSet.java deleted file mode 100644 index f2ed8c4..0000000 --- a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/temporal/TemporalTupleSet.java +++ /dev/null @@ -1,320 +0,0 @@ -package mvm.rya.indexing.accumulo.temporal; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -import info.aduna.iteration.CloseableIteration; - -import java.util.Map; -import java.util.Set; - -import mvm.rya.indexing.IndexingExpr; -import mvm.rya.indexing.IteratorFactory; -import mvm.rya.indexing.SearchFunction; -import mvm.rya.indexing.SearchFunctionFactory; -import mvm.rya.indexing.StatementContraints; -import mvm.rya.indexing.TemporalIndexer; -import mvm.rya.indexing.TemporalInstant; -import mvm.rya.indexing.TemporalInterval; -import mvm.rya.indexing.accumulo.geo.GeoTupleSet; -import mvm.rya.indexing.external.tupleSet.ExternalTupleSet; - -import org.apache.hadoop.conf.Configuration; -import org.joda.time.DateTime; -import org.openrdf.model.Statement; -import org.openrdf.model.URI; -import org.openrdf.model.impl.URIImpl; -import org.openrdf.query.BindingSet; -import org.openrdf.query.QueryEvaluationException; -import org.openrdf.query.algebra.QueryModelVisitor; - -import com.google.common.base.Joiner; -import com.google.common.collect.Maps; - -//Indexing Node for temporal expressions to be inserted into execution plan -//to delegate temporal portion of query to temporal index -public class TemporalTupleSet extends ExternalTupleSet { - - private Configuration conf; - private TemporalIndexer temporalIndexer; - private IndexingExpr filterInfo; - - - public TemporalTupleSet(IndexingExpr filterInfo, TemporalIndexer temporalIndexer) { - this.filterInfo = filterInfo; - this.temporalIndexer = temporalIndexer; - this.conf = temporalIndexer.getConf(); - } - - /** - * {@inheritDoc} - */ - @Override - public Set<String> getBindingNames() { - return filterInfo.getBindingNames(); - } - - /** - * {@inheritDoc} - * <p> - * Note that we need a deep copy for everything that (during optimizations) - * can be altered via {@link #visitChildren(QueryModelVisitor)} - */ - public TemporalTupleSet clone() { - return new TemporalTupleSet(filterInfo, temporalIndexer); - } - - @Override - public double cardinality() { - return 0.0; // No idea how the estimate cardinality here. - } - - - @Override - public String getSignature() { - - return "(TemporalTuple Projection) " + "variables: " + Joiner.on(", ").join(this.getBindingNames()).replaceAll("\\s+", " "); - } - - @Override - public boolean equals(Object other) { - if (other == this) { - return true; - } - if (!(other instanceof TemporalTupleSet)) { - return false; - } - TemporalTupleSet arg = (TemporalTupleSet) other; - return this.filterInfo.equals(arg.filterInfo); - } - - - @Override - public int hashCode() { - int result = 17; - result = 31*result + filterInfo.hashCode(); - - return result; - } - - - /** - * Returns an iterator over the result set associated with contained IndexingExpr. - * <p> - * Should be thread-safe (concurrent invocation {@link OfflineIterable} this - * method can be expected with some query evaluators. - */ - @Override - public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(BindingSet bindings) - throws QueryEvaluationException { - - - URI funcURI = filterInfo.getFunction(); - SearchFunction searchFunction = (new TemporalSearchFunctionFactory(conf)).getSearchFunction(funcURI); - - if(filterInfo.getArguments().length > 1) { - throw new IllegalArgumentException("Index functions do not support more than two arguments."); - } - - String queryText = filterInfo.getArguments()[0].stringValue(); - - return IteratorFactory.getIterator(filterInfo.getSpConstraint(), bindings, queryText, searchFunction); - } - - - //returns appropriate search function for a given URI - //search functions used by TemporalIndexer to query Temporal Index - private class TemporalSearchFunctionFactory { - - private final Map<URI, SearchFunction> SEARCH_FUNCTION_MAP = Maps.newHashMap(); - Configuration conf; - - public TemporalSearchFunctionFactory(Configuration conf) { - this.conf = conf; - } - - - /** - * Get a {@link TemporalSearchFunction} for a give URI. - * - * @param searchFunction - * @return - */ - public SearchFunction getSearchFunction(final URI searchFunction) { - - SearchFunction geoFunc = null; - - try { - geoFunc = getSearchFunctionInternal(searchFunction); - } catch (QueryEvaluationException e) { - e.printStackTrace(); - } - - return geoFunc; - } - - private SearchFunction getSearchFunctionInternal(final URI searchFunction) throws QueryEvaluationException { - SearchFunction sf = SEARCH_FUNCTION_MAP.get(searchFunction); - - if (sf != null) { - return sf; - } else { - throw new QueryEvaluationException("Unknown Search Function: " + searchFunction.stringValue()); - } - - - } - - - - private final SearchFunction TEMPORAL_InstantAfterInstant = new SearchFunction() { - @Override - public CloseableIteration<Statement, QueryEvaluationException> performSearch(String searchTerms, - StatementContraints contraints) throws QueryEvaluationException { - TemporalInstant queryInstant = new TemporalInstantRfc3339(DateTime.parse(searchTerms)); - return temporalIndexer.queryInstantAfterInstant(queryInstant, contraints); - } - - @Override - public String toString() { - return "TEMPORAL_InstantAfterInstant"; - }; - }; - private final SearchFunction TEMPORAL_InstantBeforeInstant = new SearchFunction() { - @Override - public CloseableIteration<Statement, QueryEvaluationException> performSearch(String searchTerms, - StatementContraints contraints) throws QueryEvaluationException { - TemporalInstant queryInstant = new TemporalInstantRfc3339(DateTime.parse(searchTerms)); - return temporalIndexer.queryInstantBeforeInstant(queryInstant, contraints); - } - - @Override - public String toString() { - return "TEMPORAL_InstantBeforeInstant"; - }; - }; - - private final SearchFunction TEMPORAL_InstantEqualsInstant = new SearchFunction() { - @Override - public CloseableIteration<Statement, QueryEvaluationException> performSearch(String searchTerms, - StatementContraints contraints) throws QueryEvaluationException { - TemporalInstant queryInstant = new TemporalInstantRfc3339(DateTime.parse(searchTerms)); - return temporalIndexer.queryInstantEqualsInstant(queryInstant, contraints); - } - - @Override - public String toString() { - return "TEMPORAL_InstantEqualsInstant"; - }; - }; - - private final SearchFunction TEMPORAL_InstantAfterInterval = new SearchFunction() { - @Override - public CloseableIteration<Statement, QueryEvaluationException> performSearch(String searchTerms, - StatementContraints contraints) throws QueryEvaluationException { - TemporalInterval queryInterval = TemporalInstantRfc3339.parseInterval(searchTerms); - return temporalIndexer.queryInstantAfterInterval(queryInterval, contraints); - } - - @Override - public String toString() { - return "TEMPORAL_InstantAfterInterval"; - }; - }; - - private final SearchFunction TEMPORAL_InstantBeforeInterval = new SearchFunction() { - @Override - public CloseableIteration<Statement, QueryEvaluationException> performSearch(String searchTerms, - StatementContraints contraints) throws QueryEvaluationException { - TemporalInterval queryInterval = TemporalInstantRfc3339.parseInterval(searchTerms); - return temporalIndexer.queryInstantBeforeInterval(queryInterval, contraints); - } - - @Override - public String toString() { - return "TEMPORAL_InstantBeforeInterval"; - }; - }; - - private final SearchFunction TEMPORAL_InstantInsideInterval = new SearchFunction() { - @Override - public CloseableIteration<Statement, QueryEvaluationException> performSearch(String searchTerms, - StatementContraints contraints) throws QueryEvaluationException { - TemporalInterval queryInterval = TemporalInstantRfc3339.parseInterval(searchTerms); - return temporalIndexer.queryInstantInsideInterval(queryInterval, contraints); - } - - @Override - public String toString() { - return "TEMPORAL_InstantInsideInterval"; - }; - }; - - private final SearchFunction TEMPORAL_InstantHasBeginningInterval = new SearchFunction() { - @Override - public CloseableIteration<Statement, QueryEvaluationException> performSearch(String searchTerms, - StatementContraints contraints) throws QueryEvaluationException { - TemporalInterval queryInterval = TemporalInstantRfc3339.parseInterval(searchTerms); - return temporalIndexer.queryInstantHasBeginningInterval(queryInterval, contraints); - } - - @Override - public String toString() { - return "TEMPORAL_InstantHasBeginningInterval"; - }; - }; - - private final SearchFunction TEMPORAL_InstantHasEndInterval = new SearchFunction() { - @Override - public CloseableIteration<Statement, QueryEvaluationException> performSearch(String searchTerms, - StatementContraints contraints) throws QueryEvaluationException { - TemporalInterval queryInterval = TemporalInstantRfc3339.parseInterval(searchTerms); - return temporalIndexer.queryInstantHasEndInterval(queryInterval, contraints); - } - - @Override - public String toString() { - return "TEMPORAL_InstantHasEndInterval"; - }; - }; - - { - - String TEMPORAL_NS = "tag:rya-rdf.org,2015:temporal#"; - - SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"after"), TEMPORAL_InstantAfterInstant); - SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"before"), TEMPORAL_InstantBeforeInstant); - SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"equals"), TEMPORAL_InstantEqualsInstant); - - SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"beforeInterval"), TEMPORAL_InstantBeforeInterval); - SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"afterInterval"), TEMPORAL_InstantAfterInterval); - SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"insideInterval"), TEMPORAL_InstantInsideInterval); - SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"hasBeginningInterval"), - TEMPORAL_InstantHasBeginningInterval); - SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"hasEndInterval"), TEMPORAL_InstantHasEndInterval); - - } - - - - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/extras/indexing/src/main/java/mvm/rya/indexing/external/ExternalIndexMain.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/ExternalIndexMain.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/ExternalIndexMain.java deleted file mode 100644 index c4e55be..0000000 --- a/extras/indexing/src/main/java/mvm/rya/indexing/external/ExternalIndexMain.java +++ /dev/null @@ -1,219 +0,0 @@ -package mvm.rya.indexing.external; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import java.io.File; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; - -import mvm.rya.accumulo.AccumuloRdfConfiguration; -import mvm.rya.accumulo.AccumuloRyaDAO; -import mvm.rya.indexing.accumulo.ConfigUtils; -import mvm.rya.indexing.external.tupleSet.AccumuloIndexSet; -import mvm.rya.indexing.external.tupleSet.ExternalTupleSet; -import mvm.rya.rdftriplestore.RdfCloudTripleStore; - -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Instance; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.ZooKeeperInstance; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.commons.io.FileUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Text; -import org.openrdf.query.BindingSet; -import org.openrdf.query.QueryLanguage; -import org.openrdf.query.QueryResultHandlerException; -import org.openrdf.query.TupleQueryResultHandler; -import org.openrdf.query.TupleQueryResultHandlerException; -import org.openrdf.repository.sail.SailRepository; -import org.openrdf.repository.sail.SailRepositoryConnection; -import org.openrdf.sail.Sail; - -import com.beust.jcommander.internal.Maps; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; - -public class ExternalIndexMain { - - private static String userStr = ""; - private static String passStr = ""; - - private static String instStr = ""; - private static String zooStr = ""; - - private static String tablePrefix = ""; - - private static String AUTHS = ""; - - public static void main(String[] args) throws Exception { - Preconditions.checkArgument(args.length == 6, "java " + ExternalIndexMain.class.getCanonicalName() - + " sparqlFile cbinstance cbzk cbuser cbpassword rdfTablePrefix."); - - final String sparqlFile = args[0]; - - instStr = args[1]; - zooStr = args[2]; - userStr = args[3]; - passStr = args[4]; - tablePrefix = args[5]; - - String queryString = FileUtils.readFileToString(new File(sparqlFile)); - - - // Look for Extra Indexes - Instance inst = new ZooKeeperInstance(instStr, zooStr); - Connector c = inst.getConnector(userStr, passStr.getBytes()); - - System.out.println("Searching for Indexes"); - Map<String, String> indexTables = Maps.newLinkedHashMap(); - for (String table : c.tableOperations().list()) { - if (table.startsWith(tablePrefix + "INDEX_")) { - Scanner s = c.createScanner(table, new Authorizations()); - s.setRange(Range.exact(new Text("~SPARQL"))); - for (Entry<Key, Value> e : s) { - indexTables.put(table, e.getValue().toString()); - } - } - } - - List<ExternalTupleSet> index = Lists.newArrayList(); - - if (indexTables.isEmpty()) { - System.out.println("No Index found"); - } else { - for (String table : indexTables.keySet()) { - String indexSparqlString = indexTables.get(table); - System.out.println("====================== INDEX FOUND ======================"); - System.out.println(" table : " + table); - System.out.println(" sparql : "); - System.out.println(indexSparqlString); - - index.add(new AccumuloIndexSet(indexSparqlString, c, table)); - } - } - - // Connect to Rya - Sail s = getRyaSail(); - SailRepository repo = new SailRepository(s); - repo.initialize(); - - // Perform Query - - CountingTupleQueryResultHandler count = new CountingTupleQueryResultHandler(); - - SailRepositoryConnection conn; - if (index.isEmpty()) { - conn = repo.getConnection(); - - } else { - ExternalProcessor processor = new ExternalProcessor(index); - - Sail processingSail = new ExternalSail(s, processor); - SailRepository smartSailRepo = new SailRepository(processingSail); - smartSailRepo.initialize(); - - conn = smartSailRepo.getConnection(); - } - - startTime = System.currentTimeMillis(); - lastTime = startTime; - System.out.println("Query Started"); - conn.prepareTupleQuery(QueryLanguage.SPARQL, queryString).evaluate(count); - - System.out.println("Count of Results found : " + count.i); - System.out.println("Total query time (s) : " + (System.currentTimeMillis() - startTime) / 1000.); - } - - static long lastTime = 0; - static long startTime = 0; - - private static class CountingTupleQueryResultHandler implements TupleQueryResultHandler { - public int i = 0; - - @Override - public void handleBoolean(boolean value) throws QueryResultHandlerException { - } - - @Override - public void handleLinks(List<String> linkUrls) throws QueryResultHandlerException { - } - - @Override - public void startQueryResult(List<String> bindingNames) throws TupleQueryResultHandlerException { - System.out.println("First Result Recieved (s) : " + (System.currentTimeMillis() - startTime) / 1000.); - } - - @Override - public void endQueryResult() throws TupleQueryResultHandlerException { - } - - @Override - public void handleSolution(BindingSet bindingSet) throws TupleQueryResultHandlerException { - i++; - if (i % 10 == 0) { - long mark = System.currentTimeMillis(); - System.out.println("Count : " + i + ". Time (s) : " + (mark - lastTime) / 1000.); - lastTime = mark; - } - - } - - } - - private static Configuration getConf() { - - Configuration conf = new Configuration(); - - conf.set(ConfigUtils.CLOUDBASE_USER, userStr); - conf.set(ConfigUtils.CLOUDBASE_PASSWORD, passStr); - - conf.set(ConfigUtils.CLOUDBASE_INSTANCE, instStr); - conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, zooStr); - - conf.set(ConfigUtils.CLOUDBASE_AUTHS, AUTHS); - conf.setBoolean(ConfigUtils.DISPLAY_QUERY_PLAN, true); - return conf; - } - - private static Sail getRyaSail() throws AccumuloException, AccumuloSecurityException { - - Connector connector = ConfigUtils.getConnector(getConf()); - - final RdfCloudTripleStore store = new RdfCloudTripleStore(); - AccumuloRyaDAO crdfdao = new AccumuloRyaDAO(); - crdfdao.setConnector(connector); - - AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(getConf()); - conf.setTablePrefix(tablePrefix); - crdfdao.setConf(conf); - store.setRyaDAO(crdfdao); - - return store; - } -}
